Source code for pyelectro.analysis

# -*- coding: utf-8 -*-
"""
Module for mathematical analysis of voltage traces from electrophysiology.

AUTHOR: Mike Vella vellamike@gmail.com

"""

import scipy.stats
import numpy as np
import math
import logging
import sys
from scipy import interpolate
import operator

import pprint
    
pp = pprint.PrettyPrinter(indent=4)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)



    
    

        

[docs]def voltage_plot(t,v,title=None): """ Plot electrophysiology recording. """ from matplotlib import pyplot as plt plt.xlabel('Time (ms)') plt.ylabel('Voltage (mV)') plt.title(title) plt.grid() plt.plot(t,v) plt.show()
[docs]def smooth(x,window_len=11,window='hanning'): """Smooth the data using a window with requested size. This function is useful for smoothing out experimental data. This method utilises the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. :param x: the input signal :param window_len: the dimension of the smoothing window; should be an odd integer :param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman', flat window will produce a moving average smoothing. :return: smoothed signal example: .. code-block:: python t=linspace(-2,2,0.1) x=sin(t)+randn(len(t))*0.1 y=smooth(x) .. seealso:: numpy.hanning numpy.hamming numpy.bartlett numpy.blackman numpy.convolve scipy.signal.lfilter """ if x.ndim != 1: raise(ValueError, "smooth only accepts 1 dimension arrays.") if x.size < window_len: raise(ValueError, "Input vector needs to be bigger than window size.") if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise(ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'") s=np.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]] if window == 'flat': #moving average w=np.ones(window_len,'d') else: w=eval('np.'+window+'(window_len)') y=np.convolve(w/w.sum(),s,mode='valid') edge=window_len/2 return y[edge:-edge]
[docs]def linear_fit(t, y): """ Fits data to a line :param t: time vector :param y: variable which varies with time (such as voltage) :returns: Gradient M for a formula of the type y=C+M*x """ vals=np.array(y) m,C = np.polyfit(t, vals, 1) return m
[docs]def three_spike_adaptation(t,y): """ Linear fit of amplitude vs time of first three AP spikes Initial action potential amplitudes may very substaintially in amplitude and then settle down. :param t: time vector (AP times) :param y: corresponding AP amplitude :returns: Gradient M for a formula of the type y=C+M*x for first three action potentials """ t = np.array(t) y = np.array(y) t = t[0:3] y = y[0:3] m = linear_fit(t,y) return m
[docs]def exp_fit(t, y): """ Fits data to an exponential. Returns K for a formula of the type y=A*exp(K*x) :param t: time vector :param y: variable which varies with time (such as voltage) """ vals = np.array(y) C = np.min(vals) vals = vals-C+1e-9 #make sure the data is all positive vals = np.log(vals) K, A_log = np.polyfit(t, vals, 1) return K
[docs]def window_peak_detector(v,delta = 0.01): """ Detects peak by comparing mean of either side of peak and deciding whether it exceeds some threshold. :return: Boolean, True if a peak is detected in that window """ if len(v) % 2 == 0: raise Exception("Window length must be odd") middle_index = len(v) // 2 middle_value = v[middle_index] left_mean = np.mean(v[0:middle_index]) right_mean = np.mean(v[middle_index+1:]) left_elevation = middle_value - left_mean right_elevation = middle_value - right_mean left_exceeds_threhold = left_elevation > delta right_exceeds_threshold = right_elevation > delta return left_exceeds_threhold and right_exceeds_threshold
[docs]def centered_slice(v, index, length=5): """ Retruns slice of given length centred on index. """ if length % 2 == 0: raise Exception("Window length must be odd") if len(v) < index + length // 2: raise Exception("Index too close to edge or window too big") start_index = index - length // 2 slice = v[start_index:start_index + length] return slice
[docs]def max_min_simple(a, times, delta = 0, peak_threshold = 0.0, verbose = False): print_comment("Calculating max_min_simple of a: (%s,...,%s)#%i, t: (%s,...,%s)#%i; thresh %s, delta %s"%(a[0],a[-1],len(a),times[0],times[-1],len(times), peak_threshold, delta), verbose) maxima_locations = [] maxima_number = 0 maxima_times = [] maxima_values = [] minima_locations = [] minima_number = 0 minima_times = [] minima_values = [] spiking = False has_spiked = False last_max_loc = -1 last_max_t = -1 last_max_v = -1*sys.float_info.max last_min_loc = -1 last_min_t = -1 last_min_v = sys.float_info.max for i in range(len(a)): t = times[i] v = a[i] if not spiking and v>=peak_threshold: print_comment('Spike of %s at %s'%(v,t),verbose) spiking = True has_spiked = True if last_min_loc >0: minima_locations.append(last_min_loc) minima_times.append(last_min_t) minima_values.append(last_min_v) minima_number+=1 last_min_loc = -1 last_min_t = -1 last_min_v = sys.float_info.max elif spiking and v<peak_threshold: spiking = False if last_max_loc >0: maxima_locations.append(last_max_loc) maxima_times.append(last_max_t) maxima_values.append(last_max_v) maxima_number+=1 last_max_loc = -1 last_max_t = -1 last_max_v = -1*sys.float_info.max if spiking: if v >= last_max_v: last_max_loc = i last_max_t = t last_max_v = v elif has_spiked: if v <= last_min_v: last_min_loc = i last_min_t = t last_min_v = v #need to construct the dictionary here: turning_points = {'maxima_locations':maxima_locations, 'minima_locations':minima_locations, 'maxima_number':maxima_number, 'minima_number':minima_number, 'maxima_times':maxima_times, 'minima_times':minima_times, 'maxima_values':maxima_values, 'minima_values':minima_values} return turning_points
[docs]def max_min(a, t, delta=0, peak_threshold=0.0, verbose = False): """ Find the maxima and minima of a voltage trace. :note This method does not appear to be very robust when comparing to experimental data :param a: time-dependent variable (usually voltage) :param t: time-vector :param delta: the value by which a peak or trough has to exceed its neighbours to be considered outside of the noise :param peak_threshold: peaks below this value are discarded :return: turning_points, dictionary containing number of max, min and their locations .. note:: minimum value between two peaks is in some ways a better way of obtaining a minimum since it guarantees an answer, this may be something which should be implemented. """ print_comment("Calculating max_min of a: (%s,...,%s)#%i, t: (%s,...,%s)#%i; thresh %s, delta %s"%(a[0],a[-1],len(a),t[0],t[-1],len(t), peak_threshold, delta),verbose) gradients = np.diff(a) maxima_info = [] minima_info = [] count = 0 for i in gradients[:-1]: count+=1 if i > 0 and gradients[count] < 0 and i != gradients[count]: #found a maximum maximum_value=a[count] maximum_location=count maximum_time=t[count] preceding_point_value=a[maximum_location-1] succeeding_point_value=a[maximum_location+1] #filter: maximum_valid=False #logically consistent but not very pythonic.. if ((maximum_value-preceding_point_value)>delta)*((maximum_value-succeeding_point_value)>delta): maximum_valid=True if maximum_value<peak_threshold: maximum_valid=False if maximum_valid: maxima_info.append((maximum_value,maximum_location,maximum_time)) maxima_num=len(maxima_info) if maxima_num>0: minima_num=maxima_num-1 else: minima_num=0 values_getter=operator.itemgetter(0) location_getter=operator.itemgetter(1) time_getter=operator.itemgetter(2) maxima_locations=list(map(location_getter,maxima_info)) maxima_times=list(map(time_getter,maxima_info)) maxima_values=list(map(values_getter,maxima_info)) for i in range(maxima_num-1): maximum_0_location=maxima_locations[i] maximum_1_location=maxima_locations[i+1] interspike_slice=a[maximum_0_location:maximum_1_location] minimum_value=min(interspike_slice) minimum_location=list(interspike_slice).index(minimum_value)+maximum_0_location minimum_time=t[minimum_location] minima_info.append((minimum_value,minimum_location,minimum_time)) minima_locations=list(map(location_getter,minima_info)) minima_times=list(map(time_getter,minima_info)) minima_values=list(map(values_getter,minima_info)) #need to construct the dictionary here: turning_points = {'maxima_locations':maxima_locations,'minima_locations':minima_locations,'maxima_number':maxima_num,'minima_number':minima_num,'maxima_times':maxima_times,'minima_times':minima_times, 'maxima_values':maxima_values,'minima_values':minima_values} return turning_points
''' PG removing this... def max_min2(v,t,delta=0.1,peak_threshold=0.0,window_length=11): """ Uses the max_min function but then does a second pass with window peak detector to discard peaks. This is being prepared as an enhancement to the old peak detector. """ max_min_dict = max_min(v,t,delta=0.0,peak_threshold=peak_threshold) maxima_locations = max_min_dict['maxima_locations'] peak_mask = [] for location in maxima_locations: slice = centered_slice(v,location,window_length) peak_flag = window_peak_detector(slice, delta=delta) peak_mask.append(peak_flag) #this anonymous function strips a list of all corresponding #non-zero elements in the mask: print("peak_mask: "+peak_mask) mask_filter = lambda l, mask : list(itertools.compress(l,mask)) max_min_dict.pop('maxima_number',None) max_min_dict.pop('minima_number',None) dict_keys = max_min_dict.keys() for key in dict_keys: max_min_dict[key] = mask_filter(max_min_dict[key],peak_mask) max_min_dict['maxima_number'] = len(max_min_dict['maxima_locations']) max_min_dict['minima_number'] = max_min_dict['maxima_number'] - 1 return max_min_dict'''
[docs]def spike_frequencies(t): """ Calculate frequencies associated with interspike times :param t: a list of spike times in ms :return: list of frequencies in Hz associated with interspike times and times associated with the frequency (time of first spike in pair) """ spike_times=np.array(t) interspike_times=np.diff(spike_times) interspike_frequencies=1000/interspike_times return [t[:-1],interspike_frequencies]
[docs]def max_min_interspike_time(t): """ Calculate the maximum & minimum interspike interval from the list of maxima times :param t: a list of spike times in ms :return: (max, min) interspike time """ spike_times=np.array(t) interspike_times=np.diff(spike_times) return max(interspike_times), min(interspike_times)
[docs]def mean_spike_frequency(t): """ Find the average frequency of spikes :param t: a list of spike times in ms :return: mean spike frequency in Hz, calculated from mean interspike time """ interspike_times=np.diff(t) mean_interspike_time=np.mean(interspike_times) mean_frequency=1000.0/(mean_interspike_time) #factor of 1000 to give frequency in Hz if (math.isnan(mean_frequency)): mean_frequency=0 return mean_frequency
[docs]def y_from_x(y,x,y_to_find): """ Returns list of x values corresponding to a y after a doing a univariate spline interpolation :param x: x-axis numerical data :param y: corresponding y-axis numerical data :param y_to_find: x value for desired y-value, interpolated from nearest two measured x/y value pairs :return: interpolated y value """ #TODO:should have the ability to return indices, this should be a flag yreduced = np.array(y) - y_to_find freduced = interpolate.UnivariateSpline(x, yreduced, s=None) return freduced.roots()
[docs]def single_spike_width(y,t,baseline): """ Find the width of a spike at a fixed height calculates the width of the spike at height baseline. If the spike shape does not intersect the height at both sides of the peak the method will return value 0. If the peak is below the baseline 0 will also be returned. The input must be a single spike or nonsense may be returned. Multiple-spike data can be handled by the interspike_widths method. :param y: voltage trace (array) corresponding to the spike :param t: time value array corresponding to y :param baseline: the height (voltage) where the width is to be measured. :return: width of spike at height defined by baseline """ logger.debug('Baseline: %f' %baseline) try: y = np.array(y) t = np.array(t) value = np.max(y) location = np.argmax(y) logger.debug('Max voltage: %f' %value) logger.debug('Index of max: %f' %location) #moving left: while value > baseline: location -= 1 value = y[location] undershoot_value = y[location + 1] overshoot_time = t[location] undershoot_time = t[location + 1] interpolated_left_time = np.interp(baseline, [value, undershoot_value], [overshoot_time, undershoot_time]) if location < 0: raise MathsError('Baseline does not intersect spike') #now go right value = np.max(y) location = np.argmax(y) while value > baseline : location += 1 value = y[location] undershoot_value = y[location - 1] overshoot_time = t[location] undershoot_time = t[location - 1] interpolated_right_time = np.interp(baseline, [value, undershoot_value], [overshoot_time, undershoot_time]) if location > len(y) - 1: raise MathsError('Baseline does not intersect spike') width = interpolated_right_time - interpolated_left_time except: logger.warning('Single spike width algorithm failure - setting to 0') width = 0.0 return width
[docs]def spike_widths(y,t,max_min_dictionary,baseline=0,delta=0): """ Find the widths of each spike at a fixed height in a train of spikes. Returns the width of the spike of each spike in a spike train at height baseline. If the spike shapes do not intersect the height at both sides of the peak the method will return value 0 for that spike. If the peak is below the baseline 0 will also be returned for that spike. :param y: voltage trace (array) corresponding to the spike train :param t: time value array corresponding to y :param max_min_dictionary: precalculated max_min_dictionary :param baseline: the height (voltage) where the width is to be measured. :return: width of spike at height defined by baseline """ max_num=max_min_dictionary['maxima_number'] maxima_times=max_min_dictionary['maxima_times'] minima_locations=max_min_dictionary['minima_locations'] spike_widths=[] for i in range(max_num): #need to splice down the y: if i==0: left_min_location=0 right_min_location=minima_locations[i]+1 elif i==max_num-1: left_min_location=minima_locations[i-1] right_min_location=len(y) else: left_min_location=minima_locations[i-1] right_min_location=minima_locations[i]+1 spike_shape=y[left_min_location:right_min_location] spike_t=t[left_min_location:right_min_location] try: width=single_spike_width(spike_shape,spike_t,baseline) logger.debug('Spike width: %f' %width) except: logger.warning('Spike width set to 0, this indicates a problem') width=0 spike_widths.append(width) maxima_times_widths=[maxima_times,spike_widths] return maxima_times_widths
[docs]def burst_analyser(t): """ Pearson's correlation coefficient applied to interspike times :param t: Rank-1 array containing spike times :return: pearson's correlation coefficient of interspike times """ x=np.arange(len(t)) pearsonr=scipy.stats.pearsonr(x,t)[0] return pearsonr
[docs]def spike_covar(t): """ Calculates the coefficient of variation of interspike times :param t: Rank-1 array containing spike times :return: coefficient of variation of interspike times """ interspike_times=np.diff(t) covar=scipy.stats.variation(interspike_times) return covar
[docs]def inflexion_spike_detector(v,t,threshold=0.4,indices=False,max_data_points=2000,voltage_threshold = -30): """ Computes spike start and stop times based on extent of voltage deflection. This function requires some familiarity with Python to understand. :param indices: whether to return tuples of indices for each spike or times :return list of tuples with start and end indices of every AP """ v = smooth(v) voltage_derivative = np.diff(v) voltage_above_threshold= np.where(v > voltage_threshold) voltage_derivative_above_threshold = np.where(voltage_derivative > threshold) voltage_derivative_above_threshold = np.intersect1d(voltage_derivative_above_threshold[0], voltage_above_threshold[0]) voltage_derivative_above_threshold = (np.array(voltage_derivative_above_threshold),) logging.debug('Indices where voltage derivative exceeds\ threshold: %s' %voltage_derivative_above_threshold) #this method actually sucks, we want the indices where a gap > 1, #use a reduce? diff_te = np.diff(voltage_derivative_above_threshold) initial_deflection_indices = np.where(diff_te>1.0)[1] ap_initiation_indices = [voltage_derivative_above_threshold[0][i+1] for i in initial_deflection_indices] ap_initiation_indices = np.append(voltage_derivative_above_threshold[0][0],ap_initiation_indices) logging.debug('Indices where initial deflection occurs: %s'\ %ap_initiation_indices) ap_initiation_times = t[ap_initiation_indices] logging.debug('Times where initial deflection occurs: %s'\ %ap_initiation_times) #we now have the times and indices of all the AP initiations, need #to find the corresponding end indices nearest_index = lambda value,arr : np.abs(arr - value).argmin() ap_indices = [] ap_times = [] for ap_initiation_index in ap_initiation_indices: ap_start_time = t[ap_initiation_index] ap_start_voltage = v[ap_initiation_index] offset = 10 # offset prevents corresponding time from being v_slice = v[ap_initiation_index + offset:ap_initiation_index+max_data_points] t_slice = t[ap_initiation_index+ offset:ap_initiation_index+max_data_points] corresponding_times = y_from_x(v_slice, t_slice, ap_start_voltage) logger.debug('Corresponding times: %s' %corresponding_times) try: ap_end_time = corresponding_times[nearest_index(ap_start_time,corresponding_times)] except: logger.critical('AP end time not found, AP start time: %f' %ap_start_time) ap_end_time = ap_start_time + 0.002 # TODO: this fix is nonsense # plt.plot(t,v) # plt.show() logger.critical('Corresponding times: %s' %corresponding_times) logger.critical('AP start time: %f' %ap_start_time) # voltage_plot(t,v,title='Error during spike detection') # voltage_plot(t[:-1],np.diff(v)*10) ap_end_index = nearest_index(ap_end_time,t) ap_times.append((ap_start_time,ap_end_time)) ap_indices.append((ap_initiation_index,ap_end_index)) logger.debug('Action potential start and end time: %f %f' %(ap_start_time,ap_end_time)) if indices: return_value = ap_indices else: return_value = ap_times return return_value
[docs]def ap_integrals(v,t): """ TODO:explain this fn """ logger.info('Estimating AP indices') ap_indices = inflexion_spike_detector(v,t,indices=True) logger.info('AP indices found') integrals = [] for ap_index_tuple in ap_indices: ap = v[ap_index_tuple[0]:ap_index_tuple[1]] ap_zeroed = ap - ap.min() #assume constant timestep: dt = t[1] - t[0] #estimate integral using trapezoidal rule integral = np.trapz(ap_zeroed,dx=dt) integrals.append(integral) logger.debug('AP integral calculated: %f' %integral) return np.array(integrals)
[docs]def broadening_index(v,t): """ TODO:explain this fn TODO:add logging to this module """ logger.info('Estimating integral values of spike train') integrals = ap_integrals(v,t) logger.info('AP integrals calcuated') integral_0 = integrals[0] mean_remaining_integrals = np.mean(integrals[1:]) bi = integral_0/mean_remaining_integrals logger.debug('Broadening index: %f' %bi) return bi
[docs]def elburg_bursting(spike_times): """ bursting measure B as described by Elburg & Ooyen 2004 :param spike_times: sequence of spike times :return: bursting measure B as described by Elburg & Ooyen 2004 """ interspikes_1=np.diff(spike_times) num_interspikes=len(spike_times)-1 interspikes_2=[] for i in range(num_interspikes-1): interspike=interspikes_1[i]+interspikes_1[i+1] interspikes_2.append(interspike) mean_interspike=np.mean(interspikes_1) var_i_1=np.var(interspikes_1) var_i_2=np.var(interspikes_2) B=(2*var_i_1-var_i_2)/(2*mean_interspike**2) return B
[docs]def load_csv_data(file_path, delimiter=',',plot=False): """Extracts time and voltage data from a csv file Data must be in a csv and in two columns, first time and second voltage. Units should be SI (Volts and Seconds). :param file_path: full file path to file e.g /home/mike/test.csv :return: two lists - time and voltage """ import csv csv_file= open(file_path, 'r') csv_reader=csv.reader(csv_file, delimiter=delimiter) v=[] t=[] i=0 warnings_left = 5 for row in csv_reader: try: t_value=float(row[0])*1000 #convert to ms v_value=float(row[1])*1000 #convert to mV t.append(t_value) v.append(v_value) except: if warnings_left >0: print_comment_v('Row %i invalid in %s: %s, delimiter = [%s]'%(i, file_path, row, delimiter)) warnings_left-=1 elif warnings_left == 0: print_comment_v('Supressing further warnings about %s'%(file_path)) warnings_left-=1 i+=1 if plot: from matplotlib import pyplot pyplot.plot(t,v) pyplot.title('Raw data') pyplot.xlabel('Time (ms)') pyplot.ylabel('Voltage (mV)') pyplot.show() return t,v
[docs]def phase_plane(t,y,plot=False): #plot should be here really """ Return a tuple with two vectors corresponding to the phase plane of the tracetarget """ dv=np.diff(y) dt=np.diff(t) dy_dt=dv/dt y=list(y) y=y[:-1] if plot: from matplotlib import pyplot pyplot.title('Phase Plot') pyplot.ylabel('dV/dt') pyplot.xlabel('Voltage (mV)') pyplot.plot(y,dy_dt) pyplot.show() return [y,dy_dt]
[docs]def filter(t,v): #still experimental import scipy fft=scipy.fft(v) # (G) and (H) bp=fft[:] for i in range(len(bp)): # (H-red) if i>=500:bp[i]=0 ibp=scipy.ifft(bp) # (I), (J), (K) and (L) return ibp
[docs]def pptd(t,y,bins=10,xyrange=None,dvdt_threshold=None,plot=False): """ Returns a 2D map of x vs y data and the xedges and yedges. in the form of a vector (H,xedges,yedges) Useful for the PPTD method described by Van Geit 2007. """ phase_space=phase_plane(t,y) #filter the phase space data phase_dvdt_new=[] phase_v_new=[] if dvdt_threshold!=None: i=0 for dvdt in phase_space[1]: if dvdt>dvdt_threshold: phase_dvdt_new.append(phase_space[1][i]) phase_v_new.append(phase_space[0][i]) i+=1 phase_space[1]=phase_dvdt_new phase_space[0]=phase_v_new if xyrange!=None: density_map=np.histogram2d(phase_space[1], phase_space[0], bins=bins, normed=False, weights=None) elif xyrange==None: density_map=np.histogram2d(phase_space[1], phase_space[0], bins=bins, range=xyrange, normed=False, weights=None) #Reverse the density map (probably not necessary as #it's being done because imshow has a funny origin): density=density_map[0][::-1] xedges=density_map[1] yedges=density_map[2] if plot: from matplotlib import pyplot extent = [yedges[0], yedges[-1],xedges[0], xedges[-1]] imgplot=pyplot.imshow(density, extent=extent) imgplot.set_interpolation('nearest') #makes image pixilated pyplot.title('Phase Plane Trajectory Density') pyplot.ylabel('dV/dt') pyplot.xlabel('Voltage (mV)') pyplot.colorbar() pyplot.show() return [density,xedges,yedges]
[docs]def spike_broadening(spike_width_list): """ Returns the value of the width of the first AP over the mean value of the following APs. """ first_spike = spike_width_list[0] if first_spike < 1e-6: logger.warning('First spike width <1e-6s, this indicates a problem') mean_following_spikes = np.mean(spike_width_list[1:]) broadening = first_spike/mean_following_spikes logger.debug('Spike widths: %s' %spike_width_list) logger.debug('First spike: %f, Mean of following spikes: %f' %(first_spike,mean_following_spikes)) logger.debug('Spike broadening estimate: %f' %broadening) return broadening
[docs]def pptd_error(t_model,v_model,t_target,v_target,dvdt_threshold=None): """ Returns error function value from comparison of two phase pptd maps as described by Van Geit 2007. """ pptd_data=pptd(t_target,v_target,dvdt_threshold=dvdt_threshold) target_density_map=pptd_data[0] xedges=pptd_data[1] xmin=xedges[0] xmax=xedges[-1] yedges=pptd_data[1] ymin=yedges[0] ymax=yedges[-1] xyrng=[[xmin, xmax], [ymin, ymax]] model_density_map=pptd(t_model,v_model,xyrange=xyrng, dvdt_threshold=dvdt_threshold)[0] #calculate number of data points for the model and target: N_target=sum(sum(target_density_map)) N_model=sum(sum(model_density_map)) #normalise each map: normalised_target_density_map=target_density_map/float(N_target) normalised_model_density_map=model_density_map/float(N_model) #calculate the differences and calculate the mod difference_matrix=normalised_target_density_map-normalised_model_density_map difference_matrix=abs(difference_matrix) #root each value: root_matrix=difference_matrix**0.5 #sum each element: summed_matrix=sum(sum(root_matrix)) #calculate the error: error=summed_matrix**2 print_comment_v('pptd error:'+ error) return error
[docs]def minima_phases(max_min_dictionary): """ Find the phases of minima. Minima are found by finding the minimum value between sets of two peaks. The phase of the minimum relative to the two peaks is then returned. i.e the fraction of time elapsed between the two peaks when the minimum occurs is returned. It is very important to make sure the correct delta is specified for peak discrimination, otherwise unexpected results may be returned. :param max_min_dictionary: max_min_dictionary :return: phase of minimum relative to peaks. """ minima_num=max_min_dictionary['minima_number'] maxima_num=max_min_dictionary['maxima_number'] maxima_times=max_min_dictionary['maxima_times'] minima_times=max_min_dictionary['minima_times'] minima_phases=[] for i in range(min(minima_num,maxima_num-1)): maximum_0_t=maxima_times[i] maximum_1_t=maxima_times[i+1] minimum_time=minima_times[i] phase=(minimum_time-maximum_0_t)/(maximum_1_t-maximum_0_t) minima_phases.append(phase) phase_list=[minima_times,minima_phases] return phase_list
[docs]class TraceAnalysis(object): """ Base class for analysis of electrophysiology data Constructor for TraceAnalysis base class takes the following arguments: :param v: time-dependent variable (usually voltage) :param t: time-array (1-to-1 correspondence with v_array) :param start_analysis: time in v,t where analysis is to start :param end_analysis: time in v,t where analysis is to end """ def __init__(self,v,t,start_analysis=0,end_analysis=None): self.v = np.array(v) self.t = np.array(t) if end_analysis is None: end_analysis = t[-1] start_index=self.__nearest_index(self.t,start_analysis) end_index=self.__nearest_index(self.t,end_analysis) if end_analysis!=None or start_analysis!=0: self.v=v[start_index:end_index] self.t=t[start_index:end_index] def __nearest_index(self, array, target_value): """Finds index of first nearest value to target_value in array""" nparray=np.array(array) differences=np.abs(nparray-target_value) min_difference=differences.min() index=np.nonzero(differences==min_difference)[0][0] return index
[docs] def plot_trace(self, save_fig=False, trace_name='voltage_trace.png', show_plot=True): """ Plot the trace and save it if requested by user. """ if save_fig or show_plot: import matplotlib.pyplot as plt plt.plot(self.t,self.v) plt.xlabel('Time (ms)') plt.ylabel('Votage(mV)') if save_fig: plt.savefig(trace_name) if show_plot: plt.show()
[docs]class IClampAnalysis(TraceAnalysis): """Analysis class for data from whole cell current injection experiments This is designed to work with simulations of spiking cells or current clamp experimental data. A lot of the logic here is hardcoded to work well with Cortical Layer II/III Pyramidal cells in Rats. :param v: time-dependent variable (usually voltage) :param t: time-vector :param analysis_var: dictionary containing parameters to be used in analysis such as delta for peak detection :param start_analysis: time t where analysis is to start :param end_analysis: time in t where analysis is to end """ def __init__(self, v, t, analysis_var, start_analysis=0, end_analysis=None, target_data_path=None, smooth_data=False, show_smoothed_data=False, smoothing_window_len=11, max_min_method=max_min, verbose=False): #call the parent constructor to prepare the v,t vectors: super(IClampAnalysis,self).__init__(v,t,start_analysis,end_analysis) self.verbose = verbose if smooth_data == True: self.v = smooth(self.v,window_len=smoothing_window_len) if show_smoothed_data == True: from matplotlib import pyplot as plt plt.plot(self.t,self.v) plt.show() self.delta = analysis_var['peak_delta'] self.baseline = analysis_var['baseline'] self.dvdt_threshold = analysis_var['dvdt_threshold'] self.target_data_path=target_data_path if "peak_threshold" in analysis_var.keys(): peak_threshold = analysis_var["peak_threshold"] else: peak_threshold = None self.max_min_dictionary = max_min_method(self.v, self.t, self.delta, peak_threshold = peak_threshold, verbose = self.verbose) print_comment('Max min dictionary calculated', verbose) __error_during_analysis = False #hacky way of doing this. TODO: fix @property def analysable_data(self): if self.max_min_dictionary['maxima_number'] < 3: analysable = False print_comment_v("Cannot analyse data: too few maxima (%i) in data: %s"%(self.max_min_dictionary['maxima_number'], self.max_min_dictionary)) elif max(self.v) > 100.0: analysable = False print_comment_v("Cannot analyse data: max of v (%f) >100"%max(self.v)) elif min(self.v) > -5.0: analysable = False print_comment_v("Cannot analyse data: min of v (%f) > -5"%min(self.v)) elif max(self.v) < 10.0: analysable = False print_comment_v("Cannot analyse data: max of v (%f) < 10"%max(self.v)) elif self.__error_during_analysis: analysable = False print_comment_v("Cannot analyse data: error during analysis...") else: analysable = True return analysable @analysable_data.setter def analysable_data(self, val): self.__error_during_analysis = True
[docs] def plot_results(self): """ Method represents the results visually. """ import matplotlib.pyplot as plt minima_times = self.max_min_dictionary['minima_times'] maxima_times = self.max_min_dictionary['maxima_times'] for time in minima_times: plt.axvline(x=time) for time in maxima_times: plt.axvline(x=time,color='r') plt.xlabel('Time (ms)') plt.ylabel('Voltage (mV)') plt.plot(self.t,self.v) plt.show()
[docs] def analyse(self): """If data is analysable analyses and puts all results into a dict""" if self.analysable_data: analysis_results = {} max_min_dictionary=self.max_min_dictionary analysis_results['average_minimum'] = np.average(max_min_dictionary['minima_values']) analysis_results['average_maximum'] = np.average(max_min_dictionary['maxima_values']) analysis_results['min_peak_no'] = max_min_dictionary['minima_number'] analysis_results['max_peak_no'] = max_min_dictionary['maxima_number'] analysis_results['mean_spike_frequency'] = mean_spike_frequency(max_min_dictionary['maxima_times']) analysis_results['interspike_time_covar'] = spike_covar(max_min_dictionary['maxima_times']) analysis_results['first_spike_time'] = max_min_dictionary['maxima_times'][0] max_min_isi = max_min_interspike_time(max_min_dictionary['maxima_times']) analysis_results['max_interspike_time'] = max_min_isi[0] analysis_results['min_interspike_time'] = max_min_isi[1] trough_phases=minima_phases(max_min_dictionary) try: analysis_results['trough_phase_adaptation'] = exp_fit(trough_phases[0],trough_phases[1]) except: logging.warning('trough_phase_adaptation raising an error') spike_width_list = spike_widths(self.v,self.t,max_min_dictionary,self.baseline,self.delta) try: analysis_results['spike_width_adaptation'] = exp_fit(spike_width_list[0],spike_width_list[1]) except: logging.warning('spike_width_adaptation raising an exception, exp_fit looks problematic') spike_frequency_list = spike_frequencies(max_min_dictionary['maxima_times']) analysis_results['peak_decay_exponent'] = three_spike_adaptation(max_min_dictionary['maxima_times'],max_min_dictionary['maxima_values']) analysis_results['trough_decay_exponent'] = three_spike_adaptation(max_min_dictionary['minima_times'],max_min_dictionary['minima_values']) analysis_results['spike_frequency_adaptation'] = exp_fit(spike_frequency_list[0],spike_frequency_list[1]) analysis_results['spike_broadening'] = spike_broadening(spike_width_list[1]) analysis_results['peak_linear_gradient'] = linear_fit(max_min_dictionary["maxima_times"],max_min_dictionary["maxima_values"]) #analysis_results['broadening_index'] = broadening_index(self.v,self.t) #this line here is because PPTD needs to be compared directly with experimental data: if self.target_data_path!=None and len(self.target_data_path)>0: t_experimental,v_experimental=load_csv_data(self.target_data_path) try: analysis_results['pptd_error']=pptd_error(self.t,self.v, t_experimental,v_experimental, dvdt_threshold=self.dvdt_threshold) except: print_comment_v('WARNING PPTD failure') analysis_results['pptd_error'] = 1 self.analysis_results=analysis_results else: self.analysis_results = None print_comment_v('Data not suitable for analysis',True) print_comment('Analysis complete',self.verbose) return self.analysis_results
[docs]class NetworkAnalysis(object): """Analysis class for networks of spiking cells, mainly simulation data :param v: time-dependent variable (usually voltage) :param t: time-vector :param analysis_var: dictionary containing parameters to be used in analysis such as delta for peak detection :param start_analysis: time t where analysis is to start :param end_analysis: time in t where analysis is to end """ def __init__(self, volts, t, analysis_var, start_analysis=0, end_analysis=None, smooth_data=False, show_smoothed_data=False, smoothing_window_len=11, verbose=False): self.volts = volts if not isinstance(self.volts, dict): raise ValueError("NetworkAnalysis requires a dict of y values with reference vs. voltage trace") for ref in self.volts.keys(): if not len(t)==len(self.volts[ref]): raise ValueError("One of the voltage traces (%s) has a different length to the time trace (%s != %s)!"%(ref, len(self.volts[ref]), len(t))) self.t = t self.verbose=verbose if smooth_data == True: for ref in volts.keys(): # TODO improve this craziness self.volts[ref] = smooth(np.array(self.volts[ref]), window_len=smoothing_window_len).tolist() if show_smoothed_data == True: from matplotlib import pyplot as plt for ref in volts.keys(): plt.plot(self.t, self.volts[ref], label=ref) plt.legend() plt.show() start_index=self.__nearest_index(self.t,start_analysis) if end_analysis is None: end_analysis = t[-1] end_index=len(self.t)-1 else: end_index=self.__nearest_index(self.t,end_analysis) if end_analysis!=None or start_analysis!=0: self.t=t[start_index:end_index+1] for ref in volts.keys(): self.volts[ref] =volts[ref][start_index:end_index+1] self.delta = analysis_var['peak_delta'] self.baseline = analysis_var['baseline'] self.dvdt_threshold = analysis_var['dvdt_threshold'] if "peak_threshold" in analysis_var.keys(): peak_threshold = analysis_var["peak_threshold"] else: peak_threshold = None self.max_min_dictionaries = {} for ref in self.volts.keys(): max_min_dict = max_min_simple(self.volts[ref], self.t, self.delta, peak_threshold = peak_threshold, verbose=self.verbose) self.max_min_dictionaries[ref] = max_min_dict def __nearest_index(self, array, target_value): """Finds index of first nearest value to target_value in array""" nparray=np.array(array) differences=np.abs(nparray-target_value) min_difference=differences.min() index=np.nonzero(differences==min_difference)[0][0] return index ''' targets: the standard targets to evaluate (min_peak_no, minimum, spike_broadening, etc). If None, evaluate all extra_targets: used if targets==None for specifying additional targets, e.g. cell0:value_100 '''
[docs] def analyse(self, targets=None, extra_targets=None): """ Analyses and puts all results into a dict""" analysis_results = {} for ref in self.volts.keys(): max_min_dictionary=self.max_min_dictionaries[ref] print_comment('Analysing data with %i maxima, %i minima %s'%(max_min_dictionary['maxima_number'], max_min_dictionary['minima_number'], '(targets: %s)'%targets if targets else ''), self.verbose) v = self.volts[ref] pre = '%s:'%(ref) max = -1 * sys.float_info.max min = sys.float_info.max for val in v: if val > max: max = val if val < min: min = val if targets==None or pre+'maximum' in targets: analysis_results[pre+'maximum'] = max if targets==None or pre+'minimum' in targets: analysis_results[pre+'minimum'] = min print_comment('Max: %s, min %s'%(max, min), self.verbose) if targets==None or pre+'min_peak_no' in targets: analysis_results[pre+'min_peak_no'] = max_min_dictionary['minima_number'] if targets==None or pre+'max_peak_no' in targets: analysis_results[pre+'max_peak_no'] = max_min_dictionary['maxima_number'] if max_min_dictionary['maxima_number'] >= 1: if targets==None or pre+'average_maximum' in targets: analysis_results[pre+'average_maximum'] = np.average(max_min_dictionary['maxima_values']) if targets==None or pre+'first_spike_time' in targets: analysis_results[pre+'first_spike_time'] = max_min_dictionary['maxima_times'][0] if max_min_dictionary['minima_number'] >= 1: if targets==None or pre+'average_minimum' in targets: analysis_results[pre+'average_minimum'] = np.average(max_min_dictionary['minima_values']) if max_min_dictionary['maxima_number'] >= 3: if targets==None or pre+'mean_spike_frequency' in targets: analysis_results[pre+'mean_spike_frequency'] = mean_spike_frequency(max_min_dictionary['maxima_times']) if targets==None or pre+'interspike_time_covar' in targets: analysis_results[pre+'interspike_time_covar'] = spike_covar(max_min_dictionary['maxima_times']) if targets==None or pre+'trough_phase_adaptation' in targets: trough_phases=minima_phases(max_min_dictionary) try: analysis_results[pre+'trough_phase_adaptation'] = exp_fit(trough_phases[0],trough_phases[1]) except: logging.warning('trough_phase_adaptation raising an error') if targets==None or pre+'spike_broadening' in targets or pre+'spike_width_adaptation' in targets: spike_width_list = spike_widths(v,self.t,max_min_dictionary,self.baseline,self.delta) if len(spike_width_list)>=2 and len(spike_width_list[0])>0: if targets==None or pre+'spike_broadening' in targets: analysis_results[pre+'spike_broadening'] = spike_broadening(spike_width_list[1]) if targets==None or pre+'spike_width_adaptation' in targets: try: analysis_results[pre+'spike_width_adaptation'] = exp_fit(spike_width_list[0],spike_width_list[1]) except: logging.warning('spike_width_adaptation raising an exception, exp_fit looks problematic') else: logging.warning('spike_width_list does not have enough points for calculating spike_width_adaptation or spike_broadening: %s'%spike_width_list) max_min_isi = max_min_interspike_time(max_min_dictionary['maxima_times']) if targets==None or pre+'max_interspike_time' in targets: analysis_results[pre+'max_interspike_time'] = max_min_isi[0] if targets==None or pre+'min_interspike_time' in targets: analysis_results[pre+'min_interspike_time'] = max_min_isi[1] if targets==None or pre+'peak_decay_exponent' in targets or pre+'spike_frequency_adaptation' in targets: spike_frequency_list = spike_frequencies(max_min_dictionary['maxima_times']) if targets==None or pre+'peak_decay_exponent' in targets: analysis_results[pre+'peak_decay_exponent'] = three_spike_adaptation(max_min_dictionary['maxima_times'],max_min_dictionary['maxima_values']) if targets==None or pre+'spike_frequency_adaptation' in targets: analysis_results[pre+'spike_frequency_adaptation'] = exp_fit(spike_frequency_list[0],spike_frequency_list[1]) if targets==None or pre+'trough_decay_exponent' in targets: analysis_results[pre+'trough_decay_exponent'] = three_spike_adaptation(max_min_dictionary['minima_times'],max_min_dictionary['minima_values']) if targets==None or pre+'peak_linear_gradient' in targets: analysis_results[pre+'peak_linear_gradient'] = linear_fit(max_min_dictionary["maxima_times"],max_min_dictionary["maxima_values"]) if targets==None or pre+'average_last_1percent' in targets: num_points_to_ave = int(len(v)/100.0) last_vs = v[len(v)-num_points_to_ave:] ave = 0 for vv in last_vs: ave+=vv ave = ave/len(last_vs) print_comment("Getting average of last %i points (%s->%s) of all %i (%s->%s): %s"%(len(last_vs),last_vs[0],last_vs[-1],len(v),v[0],v[-1], ave), self.verbose) analysis_results[pre+'average_last_1percent'] = ave other_targets = [] if targets!=None: other_targets.extend(targets) if extra_targets!=None: other_targets.extend(extra_targets) for target in other_targets: # e.g. cell0:value_100 => value at 100ms if target.startswith(pre+"value_"): target_time = float(target.split(':')[1].split('_')[1]) i=0 while self.t[i] < target_time: value = v[i] i+=1 analysis_results[target] = value # e.g. cell0:average_100_200 => average value between 100ms & 200ms if target.startswith(pre+"average_"): try: start_time = float(target.split(':')[1].split('_')[1]) end_time = float(target.split(':')[1].split('_')[2]) average = 0 num = 0 for i in range(len(self.t)): if self.t[i] >= start_time and self.t[i] <= end_time: average += v[i] num+=1 if num>0: average = average/num analysis_results[target] = average except ValueError: # Ignoring as it could be average_last_1percent etc. pass self.analysis_results=analysis_results return self.analysis_results