"""
.. module:: constraints
:platform: Unix
:synopsis: This module implements the usual statistical tools you need to calculate cosmological parameters confidence intervals
.. moduleauthor:: Andrea Petri <apetri@phys.columbia.edu>
"""
from __future__ import division,print_function,with_statement
import sys
from operator import mul
from functools import reduce
if sys.version_info.major>=3:
import _pickle as pickle
else:
import cPickle as pickle
#########################################################
import numpy as np
import pandas as pd
from scipy import stats,interpolate
from emcee.ensemble import _function_wrapper
try:
from matplotlib.patches import Ellipse
except ImportError:
Ellipse = None
#########################################################
from ..utils.algorithms import precision_bias_correction
from .ensemble import Series,Ensemble,Panel
from . import samplers
#########################################################
#############Default Gaussian data likelihood############
#########################################################
def gaussian_likelihood(chi2,norm=1.0):
return norm*np.exp(-0.5*chi2)
######################################################################
##########Default chi2 calculation with the sandwich product##########
######################################################################
def chi2(parameters,*args,**kwargs):
model_feature = _predict(parameters,kwargs["interpolator"])
inverse_covariance = kwargs["inverse_covariance"]
if model_feature.ndim==1:
observed_feature = kwargs["observed_feature"]
else:
observed_feature = kwargs["observed_feature"][None,:]
inverse_covariance_dot = np.dot(observed_feature - model_feature,inverse_covariance)
return ((observed_feature - model_feature) * inverse_covariance_dot).sum(-1)
#######################################################################
#############Feature prediction wrapper################################
#######################################################################
#Fast interpolation method
def _interpolate_fast(p,parameter_grid,method,weights,epsilon):
return method(((parameter_grid[None]-p[:,None])**2).sum(-1),epsilon).dot(weights)
def _predict(parameters,interpolator):
#Cast to higher dimension
parameters = np.atleast_2d(parameters)
if isinstance(interpolator,list):
#For each feature bin, compute its interpolated value
interpolated_feature = np.zeros((parameters.shape[0],len(interpolator)))
for n,i in enumerate(interpolator):
interpolated_feature[:,n] = i()(*parameters.T)
else:
#Compute fast interpolation
interpolated_feature = interpolator(parameters)
return np.squeeze(interpolated_feature)
##############################################
###########Analysis base class################
##############################################
[docs]class Analysis(Ensemble):
"""
The base class of this module; the idea in weak lensing analysis is that one has a set of simulated data, that serves as training model, and then uses that set to fit the observations for the best model parameters. Inherits from :py:class:`Ensemble`
"""
_analysis_type = None
def _check_valid(self):
assert "parameters" in self.columns.levels[0],"There are no parameters specified for this analysis!"
@classmethod
def from_features(cls,features,parameters=None,feature_index=None,parameter_index=None):
#If features and parameters are already in DataFrame instances then just append them
if isinstance(features,pd.DataFrame) and isinstance(parameters,pd.DataFrame):
assert len(parameters.columns.levels[0])==1 and parameters.columns.levels[0][0]=="parameters"
return cls.concat((parameters,features),axis=1)
#Cast shapes correctly
if len(features.shape)==1:
features = features[None]
if parameters is None:
parameters = np.arange(len(features))[None]
if len(parameters.shape)==1:
parameters = parameters[None]
#Make the indices
if parameter_index is None:
parameter_index = Series.make_index(pd.Index(range(parameters.shape[1]),name="parameters"))
elif isinstance(parameter_index,list):
parameter_index = Series.make_index(pd.Index(parameter_index,name="parameters"))
if feature_index is None:
feature_index = Series.make_index(pd.Index(range(features.shape[1]),name="features"))
elif isinstance(feature_index,list):
feature_index = Series.make_index(pd.Index(feature_index,name="features"))
#Instantiate the parameter and feature part of the analysis
analysis_param = cls(parameters,columns=parameter_index)
analysis_features = cls(features,columns=feature_index)
#Instantiate Analysis
return cls.concat((analysis_param,analysis_features),axis=1)
##################
####Properties####
##################
@property
def feature_names(self):
all_names = list(self.columns.levels[0])
all_names.remove("parameters")
return all_names
@property
def parameter_names(self):
return list(self["parameters"].columns)
@property
def parameter_set(self):
return self["parameters"].values
@property
def feature_set(self):
return self[self.feature_names].values
def parameters(self,names=None):
if names is None:
return self
parameter_names = self.parameter_names
if isinstance(names,str):
names = [names]
subset = self.copy()
exclude_names = filter(lambda n:not n in names,parameter_names)
for n in exclude_names:
subset.pop(("parameters",n))
return subset
def features(self,names=None):
if names is None:
return self
elif isinstance(names,str):
return self[["parameters"]+[names]].copy()
elif isinstance(names,list):
return self[["parameters"]+names].copy()
elif isinstance(names,dict):
pieces = [self[["parameters"]]]
for key in names.keys():
piece = self[key][names[key]]
piece.add_name(key)
pieces.append(piece)
return self.__class__.concat(pieces,axis=1)
else:
raise TypeError("names type not supported!")
##################
####Operations####
##################
[docs] def add_models(self,parameters,feature):
"""
Add a model to the training set of the current analysis
:param parameters: parameter set of the new model
:type parameters: array
:param feature: measured feature of the new model
:type feature: array
"""
#Cast dimensions
if len(parameters.shape)==1:
parameters = parameters[None]
if len(feature.shape)==1:
feature = feature[None]
#Check for input valudity
assert len(parameters)==len(feature)
assert parameters.shape[1] == self.parameter_set.shape[1]
assert feature.shape[1:] == self.feature_set.shape[1:]
#hstack
parameters_and_features = np.hstack((parameters,feature))
#Return the newly created Analysis
return self.append(self._constructor(parameters_and_features,columns=self.columns),ignore_index=True)
[docs] def reparametrize(self,transformation,**kwargs):
"""
Reparametrize the parameter set of the analysis by calling the formatter handle on the current parameter set (can be used to enlarge/shrink/relabel the parameter set)
:param transformation: transformation function called on the parameters, must take in a row of parameters and return another row of parameters
:type transformation: callable
:param kwargs: the keyword arguments are passed to the transformation callable
:type kwargs: dict.
:returns: reparametrized Analysis
"""
#Apply the transformation
new_parameters = self["parameters"].apply(transformation,axis=1,**kwargs)
new_parameters.columns.name = "parameters"
new_parameters.columns = Series.make_index(new_parameters.columns)
#Return the reparametrized analysis
reparametrized_analysis = self.copy()
reparametrized_analysis.pop("parameters")
return self.__class__.concat((new_parameters,reparametrized_analysis),axis=1)
[docs] def refeaturize(self,transformation,method="apply_row",**kwargs):
"""
Allows a general transformation on the feature set of the analysis by calling an arbitrary transformation function
:param transformation: callback function called on the feature_set; must take in a row of features and return a row of features. If a dictionary is passed, the keys must be the feature names
:type transformation: callable or dict.
:param kwargs: the keyword arguments are passed to the transformation callable
:type kwargs: dict.
:returns: transformed Analysis
"""
#Build transformation dictionary
if isinstance(transformation,dict):
transformation_dict = dict((n,lambda x:x) for n in self.feature_names)
for n in transformation.keys():
transformation_dict[n] = transformation[n]
else:
transformation_dict = dict((n,transformation) for n in self.feature_names)
#Apply the transformations to each feature
transformed_features = list()
for n in self.feature_names:
if method=="apply_row":
transformed_feature = self[[n]].apply(transformation_dict[n],axis=1,**kwargs)
elif method=="apply_whole":
transformed_feature = transformation_dict[n](self[[n]],**kwargs)
transformed_feature.add_name(n)
transformed_feature.index = self.index
elif method=="dot":
transformed_feature = self[[n]].dot(transformation_dict[n])
transformed_feature.add_name(n)
transformed_feature.index = self.index
else:
raise NotImplementedError("transformation method {0} not implemented!".format(method))
transformed_features.append(transformed_feature)
#Concatenate and return
return self.__class__.concat([self[["parameters"]]]+transformed_features,axis=1)
[docs] def combine_features(self,combinations):
"""
Combine features in the Analysis, according to a dictionary which keys are the name of the combined features
:param combinations: mapping of combined features onto the old ones
:type combinations: dict.
"""
combined_features = list()
#Cycle over the combinations keys
for n in combinations.keys():
#Select
combined_feature = self[combinations[n]].copy()
#Merge the column names
combined_feature_index = pd.Index(np.hstack([ combined_feature[c].columns.values for c in combinations[n] ]),name=n)
combined_feature_index = Series.make_index(combined_feature_index)
combined_feature.columns = combined_feature_index
#Append to the combination list
combined_features.append(combined_feature)
#Concatenate everything
return self.__class__.concat([self[["parameters"]]]+combined_features,axis=1)
###############################################################################################################################
[docs] def find(self,parameters,rtol=1.0e-05):
"""
Finds the location in the instance that has the specified combination of parameters
:param parameters: the parameters of the model to find
:type parameters: array.
:param rtol: tolerance of the search (must be less than 1)
:type rtol: float.
:returns: array of int. with the indices of the corresponding models
"""
assert len(parameters)==self.parameter_set.shape[1]
search_result = np.all(np.isclose(self.parameter_set,parameters,rtol=rtol),axis=1)
return np.where(search_result==True)[0]
###############################################################################################################################
@staticmethod
def ellipse(center,covariance,p_value=0.684,**kwargs):
"""
Draws a confidence ellipse using matplotlib Ellipse patch
:param center: center of the ellipse
:type center: tuple.
:param covariance: parameters covariance matrix
:type covariance: 2D-array.
:param p_value: p-value to calculate
:type p_value: float.
:param kwargs: the keyword arguments are passed to the matplotlib Ellipse method
:type kwargs: dict.
:returns: matplotlib ellipse object
:rtype: Ellipse
"""
#Check that ellipse patch is available
if Ellipse is None:
raise ImportError("The matplotlib Ellipse patch is necessary to use this method!")
#Compute the directions and sizes of the ellipse axes
w,v = np.linalg.eigh(covariance)
width,height = 2*np.sqrt(w * stats.chi2(2).ppf(p_value))
try:
angle = 180.*np.arctan(v[1,0] / v[0,0]) / np.pi
except ZeroDivisionError:
angle = 90.
#Draw the ellipse
return Ellipse(center,width,height,angle=angle,**kwargs)
###################################################
#############Fisher matrix analysis################
###################################################
class FisherSeries(Series):
@property
def _constructor_expanddim(self):
return FisherAnalysis
[docs]class FisherAnalysis(Analysis):
################################################################
##############DataFrame subclassing#############################
################################################################
@property
def _constructor_sliced(self):
return FisherSeries
@property
def _constructor_expanddim(self):
return FisherPanel
#################################################################
_analysis_type = "Fisher"
_fiducial = 0
"""
The class handler of a Fisher matrix analysis, inherits from the base class Analysis
"""
[docs] def set_fiducial(self,n):
"""
Sets the fiducial model (with respect to which to compute the derivatives), default is 0 (i.e. self.parameter_set[0])
:param n: the parameter set you want to use as fiducial
:type n: int.
"""
assert n < self.parameter_set.shape[0],"There are less than {0} models in your analysis".format(n+1)
self._fiducial = n
@property
def fiducial(self):
return self.feature_set[self._fiducial]
@property
def _variations(self):
"""
Checks the parameter variations with respect to the fiducial cosmology
:returns: bool array (True if the parameter is varied, False otherwise)
"""
return self.parameter_set!=self.parameter_set[self._fiducial]
@property
def variations(self):
"""
Checks the parameter variations with respect to the fiducial cosmology
:returns: iterable with the positions of the variations
"""
for n,b in enumerate(self._variations.sum(1)):
if b:
yield n
[docs] def check(self):
"""
Asserts that the parameters are varied one at a time, and that a parameter is not varied more than once
:raises: AssertionError
"""
assert (self._variations.sum(1)<2).all(),"You can vary only a parameter at a time!"
#Check how many variations are there for each parameter
num_par_variations = self._variations.sum(0)
if (num_par_variations<2).all():
return 0
else:
return 1
[docs] def where(self,par=None):
"""
Finds the locations of the varied parameters in the parameter set
:returns: dict. with the locations of the variations, for each parameter
"""
loc = dict()
v = np.where(self._variations==1)
#Decide if keys are lists or simple numbers
if self.check():
for n in range(self.parameter_set.shape[1]):
loc[n] = list()
for n in range(len(v[0])):
loc[v[1][n]].append(v[0][n])
else:
for n in range(len(v[0])):
loc[v[1][n]] = v[0][n]
if par is None:
return loc
else:
return loc[par]
@property
def varied(self):
"""
Returns the indices of the parameters that are varied
:returns: list with the indices of the varied parameters
"""
return list(sorted(self.where()))
[docs] def compute_derivatives(self):
"""
Computes the feature derivatives with respect to the parameter sets using one step finite differences; the derivatives are computed with respect to the fiducial parameter set
:returns: array of shape (p,N), where N is the feature dimension and p is the number of varied parameters
"""
assert self.parameter_set.shape[0] > 1,"You need at least 2 models to proceed in a Fisher Analysis!"
assert self.check()==0,"Finite differences implemented only at first order! Cannot compute derivatives"
#Find the varied parameters and their locations
loc_varied = self.where()
par_varied = list(sorted(loc_varied))
#Allocate space for the derivatives
derivatives = np.zeros((len(par_varied),)+self.feature_set.shape[1:])
#cycle to parameters to calculate derivatives
for n,p in enumerate(par_varied):
#Calculate the finite difference derivative with respect to this parameter
derivatives[n] = (self.feature_set[loc_varied[p]] - self.feature_set[self._fiducial]) / (self.parameter_set[loc_varied[p],p] - self.parameter_set[self._fiducial,p])
#set the derivatives attribute and return the result
self._derivatives = self.__class__(derivatives,index=[self.parameter_names[n] for n in par_varied],columns=self[self.feature_names].columns)
@property
def derivatives(self):
if not hasattr(self,"_derivatives"):
self.compute_derivatives()
return self._derivatives
[docs] def chi2(self,observed_feature,features_covariance,correct=None):
"""
Computes the chi2 between an observed feature and the fiducial feature, using the provided covariance
:param observed_feature: observed feature to fit, its last dimension must have the same shape as self.feature_set[0]
:type observed_feature: array
:param features_covariance: covariance matrix of the simulated features, must be provided for a correct fit!
:type features_covariance: 2 dimensional array (or 1 dimensional if diagonal)
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:returns: chi2 of the comparison
:rtype: float.
"""
#Cast pandas types
assert features_covariance is not None,"No science without the covariance matrix, you must provide one!"
if isinstance(observed_feature,pd.Series) or isinstance(observed_feature,pd.DataFrame):
observed_feature = observed_feature.values
if isinstance(features_covariance,pd.DataFrame):
features_covariance = features_covariance.values
#Cast the observed feature in suitable shape
if len(observed_feature.shape)==1:
observed_feature = observed_feature[None]
single = True
else:
single = False
#Check for correct shape of input
assert observed_feature.shape[-1:]==self.feature_set.shape[-1:]
assert features_covariance.shape in [self.feature_set.shape[-1:],self.feature_set.shape[-1:]*2]
#Compute the difference
difference = observed_feature - self.fiducial[None]
#Compute the chi2
if features_covariance.shape==self.feature_set.shape[-1:]:
result = ((difference**2)/features_covariance[None]).sum(-1)
else:
if correct is not None:
result = (difference * np.linalg.solve(features_covariance/precision_bias_correction(correct,len(features_covariance)),difference.transpose()).transpose()).sum(-1)
else:
result = (difference * np.linalg.solve(features_covariance,difference.transpose()).transpose()).sum(-1)
#Return the result
if single:
return result[0]
else:
return result
[docs] def fit(self,observed_feature,features_covariance):
"""
Maximizes the gaussian likelihood on which the Fisher matrix formalism is based, and returns the best fit for the parameters given the observed feature
:param observed_feature: observed feature to fit, must have the same shape as self.feature_set[0]
:type observed_feature: array
:param features_covariance: covariance matrix of the simulated features, must be provided for a correct fit!
:type features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:returns: array with the best fitted parameter values
"""
assert features_covariance is not None,"No science without the covariance matrix, you must provide one!"
#Cast pandas types
if isinstance(observed_feature,pd.Series) or isinstance(observed_feature,pd.DataFrame):
observed_feature = observed_feature.values
if isinstance(features_covariance,pd.DataFrame):
features_covariance = features_covariance.values
#Check for correct shape of input
assert (observed_feature.shape==self.feature_set.shape[1:]) or (observed_feature.shape[1:]==self.feature_set.shape[1:])
assert (features_covariance.shape==self.feature_set.shape[1:] * 2) or (features_covariance.shape==self.feature_set.shape[1:])
#Linear algebra manipulations (parameters = M x features)
if features_covariance.shape==self.feature_set.shape[1:] * 2:
Y = np.linalg.solve(features_covariance,self.derivatives.values.transpose())
else:
Y = (1/features_covariance[:,np.newaxis]) * self.derivatives.values.transpose()
XY = np.dot(self.derivatives.values,Y)
M = np.linalg.solve(XY,Y.transpose())
#Compute difference in parameters (with respect to the fiducial model)
if observed_feature.ndim==1:
dP = np.dot(M,observed_feature - self.feature_set[self._fiducial])
else:
dP = np.dot(observed_feature - self.feature_set[[self._fiducial]],M.T)
#Return the actual best fit
if dP.ndim==1:
return self._constructor_sliced(self.parameter_set[self._fiducial,self.varied] + dP,index=self.derivatives.index)
else:
return self.__class__(self.parameter_set[self._fiducial,self.varied][None] + dP,columns=self.derivatives.index)
[docs] def classify(self,observed_feature,features_covariance,correct=None,labels=range(2),confusion=False):
"""
Performs a Fisher classification of the observed feature, choosing the most probable label based on the value of the chi2
:param observed_feature: observed feature to fit, the last dimenstion must have the same shape as self.feature_set[0]
:type observed_feature: array
:param features_covariance: covariance matrix of the simulated features, must be provided for a correct classification!
:type features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:param labels: labels of the classification, must be the indices of the available classes (from 0 to feature_set.shape[0])
:type labels: iterable
:param confusion: if True, an array with the label percentage occurrences is returned; if False an array of labels is returned
:type confusion: bool.
:returns: array with the labels resulting from the classification
:rtype: int.
"""
fiducial_original = self._fiducial
#Compute all the chi squared values, for each observed feature and each label
all_chi2 = list()
for l in labels:
self.set_fiducial(l)
all_chi2.append(self.chi2(observed_feature,features_covariance,correct=correct))
self.set_fiducial(fiducial_original)
#Cast the list into an array
all_chi2 = np.array(all_chi2)
#Find the minima
chi2_min = all_chi2.argmin(0)
#Translate into the corresponding classes
classes = np.zeros_like(chi2_min)
for n,l in enumerate(labels):
classes[chi2_min==n] = l
if confusion:
#Compute confusion array
confusion_array = np.zeros(n+1)
for n,l in enumerate(labels):
confusion_array[n] = (classes==l).sum() / len(classes)
#Return
return confusion_array
else:
#Return
return classes
###############################################################################################################################################################
[docs] def parameter_covariance(self,simulated_features_covariance,correct=None,observed_features_covariance=None):
"""
Computes the parameter covariance matrix using the associated features, that in the end allows to compute the parameter confidence contours (around the fiducial value)
:param simulated_features_covariance: covariance matrix of the simulated features, must be provided for a correct fit!
:type simulated_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:param observed_features_covariance: covariance matrix of the simulated features, if different from the simulated one; if None the simulated feature covariance is used
:type observed_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:returns: 2 dimensional array with the parameter covariance matrix of the analysis
"""
assert simulated_features_covariance is not None,"No science without the covariance matrix, you must provide one!"
#Cast pandas types
if isinstance(simulated_features_covariance,pd.DataFrame):
simulated_features_covariance = simulated_features_covariance.values
if (observed_features_covariance is not None) and (isinstance(observed_features_covariance,pd.DataFrame)):
observed_features_covariance = observed_features_covariance.values
#Check for correct shape of input
assert simulated_features_covariance.shape == self.feature_set.shape[1:] * 2 or simulated_features_covariance.shape == self.feature_set.shape[1:]
#Linear algebra manipulations (parameters = M x features)
if simulated_features_covariance.shape == self.feature_set.shape[1:] * 2:
Y = np.linalg.solve(simulated_features_covariance,self.derivatives.values.transpose())
else:
Y = (1/simulated_features_covariance[:,np.newaxis]) * self.derivatives.values.transpose()
XY = np.dot(self.derivatives.values,Y)
#If we are using the same covariance matrix for observations and simulations, then XY is the Fisher matrix; otherwise we need to compute M too
if observed_features_covariance is None:
if correct is not None:
return self.__class__(np.linalg.inv(XY),index=self.derivatives.index,columns=self.derivatives.index) / precision_bias_correction(correct,len(simulated_features_covariance))
else:
return self.__class__(np.linalg.inv(XY),index=self.derivatives.index,columns=self.derivatives.index)
else:
assert observed_features_covariance.shape == self.feature_set.shape[1:] * 2 or observed_features_covariance.shape == self.feature_set.shape[1:]
M = np.linalg.solve(XY,Y.transpose())
if observed_features_covariance.shape == self.feature_set.shape[1:] * 2:
parameter_covariance = np.dot(M,np.dot(observed_features_covariance,M.transpose()))
else:
parameter_covariance = np.dot(M * observed_features_covariance,M.transpose())
return self.__class__(parameter_covariance,index=self.derivatives.index,columns=self.derivatives.index)
[docs] def fisher_matrix(self,simulated_features_covariance,correct=None,observed_features_covariance=None):
"""
Computes the parameter Fisher matrix using the associated features, that in the end allows to compute the parameter confidence contours (around the fiducial value)
:param simulated_features_covariance: covariance matrix of the simulated features, must be provided for a correct fit!
:type simulated_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:param observed_features_covariance: covariance matrix of the simulated features, if different from the simulated one; if None the simulated feature covariance is used
:type observed_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:returns: 2 dimensional array with the Fisher matrix of the analysis
"""
parcov = self.parameter_covariance(simulated_features_covariance,correct,observed_features_covariance)
return self.__class__(np.linalg.inv(parcov.values),index=parcov.index,columns=parcov.columns)
[docs] def confidence_ellipse(self,simulated_features_covariance,correct=None,observed_feature=None,observed_features_covariance=None,parameters=["Om","w"],p_value=0.684,**kwargs):
"""
Draws a confidence ellipse of a specified p-value in parameter space, corresponding to fit an observed feature for the cosmological parameters
:param observed_feature: observed feature to fit, the last dimenstion must have the same shape as self.feature_set[0]
:type observed_feature: array
:param simulated_features_covariance: covariance matrix of the simulated features, must be provided for a correct fit!
:type simulated_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:param observed_features_covariance: covariance matrix of the simulated features, if different from the simulated one; if None the simulated feature covariance is used
:type observed_features_covariance: 2 dimensional array (or 1 dimensional if assumed diagonal)
:param parameters: parameters to compute the condifence contour of
:type parameters: list.
:param p_value: p-value to calculate
:type p_value: float.
:param kwargs: the keyword arguments are passed to the matplotlib Ellipse method
:type kwargs: dict.
:returns: matplotlib ellipse object
:rtype: Ellipse
"""
if len(parameters)!=2:
raise ValueError("You must specify exactly two parameters to draw the ellipse of!")
#If the observed feature is not provided, the center of the ellipse is (0,0)
if observed_feature is None:
center = (0,0)
else:
#Fit the observed feature and put the center here
p_fit = self.fit(observed_feature,simulated_features_covariance)
center = tuple(p_fit[parameters])
#The parameter covariance sets the size and orientation of the ellipse
p_cov = self.parameter_covariance(simulated_features_covariance,correct,observed_features_covariance)[parameters].loc[parameters]
#Return Ellipse object to user
return self.ellipse(center,p_cov.values,p_value,**kwargs)
###########################################################################################################################################
def reparametrize(self,formatter,*args,**kwargs):
#Call the parent method
super(FisherAnalysis,self).reparametrize(formatter,*args,**kwargs)
#Check that the format of the parameter set is valid
self.check()
class FisherPanel(Panel):
@property
def _constructor_sliced(self):
return FisherAnalysis
#######################################################
#############Full analysis#############################
#######################################################
class EmulatorSeries(Series):
@property
def _constructor_expanddim(self):
return Emulator
[docs]class Emulator(Analysis):
_analysis_type = "Emulator"
"""
The class handler of a full likelihood analysis; the parameter likelihood function is calculated with an interpolation of various kind between simulation points
"""
################################################################
##############DataFrame subclassing#############################
################################################################
@property
def _constructor_sliced(self):
return EmulatorSeries
@property
def _constructor_expanddim(self):
return EmulatorPanel
##################################
########Constructor###############
##################################
def __init__(self,*args,**kwargs):
super(Emulator,self).__init__(*args,**kwargs)
self._likelihood_function = gaussian_likelihood
if "_likelihood_function" not in self._metadata:
self._metadata.append("_likelihood_function")
#######################################################################################################################################
[docs] def approximate_linear(self,center,derivative_precision=0.1):
"""
Construct a FisherAnalysis by approximating the Emulator as a linear expansion along a chosen center
:param center: center point in parameter space
:type center: Series
:param derivative_precision: percentage step for the finite difference derivatives
:type derivative_precision: float.
:returns: linearly approximated Fisher analysis
:rtype: FisherAnalysis
"""
npar = len(self.parameter_names)
#Construct the parameter set for the Fisher Analysis
parameters_fisher = Ensemble(np.zeros((npar+1,npar)),columns=self.parameter_names)
#Fiducial
for n in range(len(parameters_fisher)):
parameters_fisher.iloc[n] = center
#Variations
for n in range(1,len(parameters_fisher)):
parameters_fisher.iloc[n,n-1] += np.abs(parameters_fisher.iloc[n,n-1])*derivative_precision
#Predict the features with the emulator, build the Fisher Analysis
features = self.predict(parameters_fisher)
parameters_fisher.add_name("parameters")
return FisherAnalysis.from_features(features,parameters=parameters_fisher)
#######################################################################################################################################
[docs] def set_likelihood(self,function=None):
"""
Sets the likelihood function to a custom function input by the user: the default is the usual exp(-0.5*chi^2)
"""
assert function is not None
self._likelihood_function = function
[docs] def train(self,use_parameters="all",method="Rbf",**kwargs):
"""
Builds the interpolators for each of the feature bins using a radial basis function approach
:param use_parameters: which parameters actually vary in the supplied parameter set (it doesn't make sense to interpolate over the constant ones)
:type use_parameters: list. or "all"
:param method: interpolation method; can be 'Rbf' or callable. If callable, it must take two arguments, a square distance and a square length smoothing scale
:type method: str. or callable
:param kwargs: keyword arguments to be passed to the interpolator constructor
"""
#input sanity check
if use_parameters != "all":
assert type(use_parameters) == list
used_parameters = self.parameter_set[:,use_parameters]
else:
used_parameters = self.parameter_set
#Compute total number of feature bins and reshape the training set accordingly
if "_num_bins" not in self._metadata:
self._metadata.append("_num_bins")
self._num_bins = reduce(mul,self.feature_set.shape[1:])
flattened_feature_set = self.feature_set.reshape((self.feature_set.shape[0],self._num_bins))
#Build the interpolator
if "_interpolator" not in self._metadata:
self._metadata.append("_interpolator")
if method=="Rbf":
#Scipy Rbf method
self._interpolator = list()
for n in range(self._num_bins):
self._interpolator.append(_interpolate_wrapper(interpolate.Rbf,args=(tuple(used_parameters.T) + (flattened_feature_set[:,n],)),kwargs=kwargs))
else:
#Compute pairwise square distance between points
distances = ((used_parameters[None] - used_parameters[:,None])**2).sum(-1)
epsilon = distances[np.triu_indices(len(distances),k=1)].mean()
kernel = method(distances,epsilon)
weights = np.linalg.solve(kernel,self.feature_set)
#Wrap interpolator
self._interpolator = _function_wrapper(_interpolate_fast,args=[],kwargs={"parameter_grid":used_parameters,"method":method,"weights":weights,"epsilon":epsilon})
###############################################################################################################################################################
[docs] def predict(self,parameters,raw=False):
"""
Predicts the feature at a new point in parameter space using the bin interpolators, trained with the simulated features
:param parameters: new points in parameter space on which to compute the chi2 statistic; it'a (N,p) array where N is the number of points and p the number of parameters, or array of size p if there is only one point
:type parameters: array
:param raw: if True returns raw numpy arrays
:type raw: bool.
:returns: predicted features
:rtype: array or :py:class:`Ensemble`
"""
#If you didn't do training before, train now with the default settings
if not hasattr(self,"_interpolator"):
self.train()
#Cast DataFrames to numpy arrays
if isinstance(parameters,pd.DataFrame):
assert (parameters.columns==self["parameters"].columns).all(),"Parameters do not match!"
parameters = parameters.values
elif isinstance(parameters,pd.Series):
assert (parameters.index==self["parameters"].columns).all(),"Parameters do not match!"
parameters = parameters.values
#####################################
#Interpolate to compute the features#
#####################################
interpolated_feature = _predict(parameters,self._interpolator)
############################################################################################
#Return the result
if raw:
return interpolated_feature
else:
if parameters.ndim==1:
return Series(interpolated_feature.reshape(self.feature_set.shape[1:]),index=self[self.feature_names].columns)
else:
return Ensemble(interpolated_feature.reshape((parameters.shape[0],) + self.feature_set.shape[1:]),columns=self[self.feature_names].columns)
###############################################################################################################################################################
[docs] def chi2(self,parameters,observed_feature,features_covariance,correct=None,split_chunks=None,pool=None):
"""
Computes the chi2 part of the parameter likelihood with the usual sandwich product with the covariance matrix; the model features are computed with the interpolators
:param parameters: new points in parameter space on which to compute the chi2 statistic
:type parameters: (N,p) array where N is the number of points and p the number of parameters
:param observed_feature: observed feature on which to condition the parameter likelihood
:type observed_feature: array
:param features_covariance: covariance matrix of the features, must be supplied
:type features_covariance: array
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:param split_chunks: if set to an integer bigger than 0, splits the calculation of the chi2 into subsequent chunks, each that takes care of an equal number of points. Each chunk could be taken care of by a different processor
:type split_chunks: int.
:returns: array with the chi2 values, with the same shape of the parameters input
"""
#Sanity checks
assert observed_feature is not None
assert features_covariance is not None,"No science without the covariance matrix, you must provide one!"
assert observed_feature.shape == self.feature_set.shape[1:]
assert features_covariance.shape == observed_feature.shape * 2
#If you didn't do training before, train now with the default settings
if not hasattr(self,"_interpolator"):
self.train()
#Reformat the parameter input into a list of chunks
if parameters.ndim==1:
num_points = 1
else:
num_points = parameters.shape[0]
if split_chunks is None:
parameter_chunks = [parameters]
elif split_chunks > 0:
assert num_points%split_chunks == 0,"split_chunks must divide exactly the number of points!!"
chunk_length = num_points//split_chunks
parameter_chunks = [ parameters[n*chunk_length:(n+1)*chunk_length] for n in range(split_chunks) ]
else:
raise ValueError("split_chunks must be >0!!")
#Compute the inverse of the covariance matrix once and for all
covinv = np.linalg.inv(features_covariance)
if correct is not None:
covinv *= precision_bias_correction(correct,len(covinv))
#Build the keyword argument dictionary to be passed to the chi2 calculator
kwargs = {"interpolator":self._interpolator,"inverse_covariance":covinv,"observed_feature":observed_feature}
#Hack to make the chi2 pickleable (from emcee)
chi2_wrapper = _function_wrapper(chi2,tuple(),kwargs)
#Finally map chi2 calculator on the list of chunks
if pool is not None:
M = pool.map
else:
M = map
chi2_list = list(M(chi2_wrapper,parameter_chunks))
return np.array(chi2_list).reshape(num_points)
[docs] def chi2Contributions(self,parameters,observed_feature,features_covariance,correct=None):
"""
Computes the individual contributions of each feature bin to the chi2; the model features are computed with the interpolators. The full chi2 is the sum of the individual contributions
:param parameters: new points in parameter space on which to compute the chi2 statistic
:type parameters: (N,p) array where N is the number of points and p the number of parameters
:param observed_feature: observed feature on which to condition the parameter likelihood
:type observed_feature: array
:param features_covariance: covariance matrix of the features, must be supplied
:type features_covariance: array
:param correct: if not None, correct for the bias in the inverse covariance estimator assuming the covariance was estimated by 'correct' simulations
:type correct: int.
:returns: numpy 2D array with the contributions to the chi2 (off diagonal elements are the contributions of the cross correlation between bins)
"""
#Sanity checks
assert observed_feature is not None
assert features_covariance is not None,"No science without the covariance matrix, you must provide one!"
assert observed_feature.shape == self.feature_set.shape[1:]
assert features_covariance.shape == observed_feature.shape * 2
#If you didn't do training before, train now with the default settings
if not hasattr(self,"_interpolator"):
self.train()
#Compute each bin contribution to the chi2
residuals = observed_feature - self.predict(parameters)
#Compute the inverse covariance
covinv = np.linalg.inv(features_covariance)
if correct is not None:
covinv *= precision_bias_correction(correct,len(inverse_covariance_dot))
#Compute the hits map
return np.outer(residuals,residuals) * covinv
[docs] def likelihood(self,chi2_value,**kwargs):
"""
Computes the likelihood value with the selected likelihood function, given the pre-computed chi2 value
:param chi2_value: chi squared values
:type chi2_value: array
:param kwargs: keyword arguments to be passed to your likelihood function
"""
return self._likelihood_function(chi2_value,**kwargs)
############################################################################################################################################
[docs] def score(self,parameters,observed_feature,method="chi2",**kwargs):
"""
Compute the score for an observed feature for each combination of the proposed parameters
:param parameters: parameter combinations to score
:type parameters: DataFrame or array
:param observed_feature: observed feature to score
:type observed_feature: Series
:param method: scoring method to use (defaults to chi2): if callable, must take in the current instance, the parameter array and the observed feature and return a score for each parameter combination
:type method: str. or callable
:param kwargs: keyword arguments passed to the callable method
:type kwargs: dict.
:returns: ensemble with the scores, for each feature
:rtype: :py:class:`Ensemble`
"""
#Get the names of the features to use
feature_names = list(observed_feature.index.levels[0])
try:
feature_names.remove("parameters")
except ValueError:
pass
#Check that the observed feature columns and the Emulator columns correspond
for c in feature_names:
assert c in self.feature_names,"Feature '{0}' is not present in the Emulator!".format(c)
#Reorder the parameters according to the ones in the Emulator
parameters = parameters[self.parameter_names]
#If the method is chi2, a covariance matrix must be provided
if method=="chi2":
assert "features_covariance" in kwargs.keys()
features_covariance = kwargs["features_covariance"]
del(kwargs["features_covariance"])
assert (features_covariance.index==observed_feature.index).all()
assert (features_covariance.columns==observed_feature.index).all()
#Build an Ensemble with the parameters
score_ensemble = Ensemble(parameters)
#For each feature, compute the score
for c in feature_names:
#Isolate the Emulator that concerns this feature only
sub_emulator = self.features(c)
sub_emulator_columns = sub_emulator[c].columns
#Isolate the observed sub_feature
sub_feature = observed_feature[c][sub_emulator_columns].values
#If the method is chi2, use the already implemented version of it
if method=="chi2":
sub_emulator.train()
sub_feature_covariance = features_covariance[c][sub_emulator_columns].loc[c].loc[sub_emulator_columns].values
score_ensemble[c] = sub_emulator.chi2(parameters=parameters.values,observed_feature=sub_feature,features_covariance=sub_feature_covariance,**kwargs)
else:
score_ensemble[c] = method(sub_emulator,parameters.values,sub_feature,**kwargs)
#Return the score ensemble
return score_ensemble
############################################################################################################################################
[docs] def sample_posterior(self,observed_feature,sample="emcee",**kwargs):
"""
Sample the parameter posterior distribution
:param observed_feature: observed feature to score
:type observed_feature: Series
:param sample: posterior sampling method
:type sample: str. or callable
:returns: samples from the posterior distribution
:rtype: dict.
"""
if sample=="emcee":
sample = samplers.emcee_sampler
#Get the names of the features to use
feature_names = list(observed_feature.index.levels[0])
try:
feature_names.remove("parameters")
except ValueError:
pass
#Check that the observed feature columns and the Emulator columns correspond
for c in feature_names:
assert c in self.feature_names,"Feature '{0}' is not present in the Emulator!".format(c)
#Check if the user provides a covariance matrix
if "features_covariance" in kwargs.keys():
features_covariance = kwargs["features_covariance"]
del(kwargs["features_covariance"])
assert (features_covariance.index==observed_feature.index).all()
assert (features_covariance.columns==observed_feature.index).all()
else:
features_covariance = None
#Parameter samples
samples = dict()
#For each feature, compute the score
for c in feature_names:
#Isolate the Emulator that concerns this feature only
sub_emulator = self.features(c)
sub_emulator_columns = sub_emulator[c].columns
#Isolate the observed sub_feature
sub_feature = observed_feature[c][sub_emulator_columns].values
#Train the emulator
sub_emulator.train()
#Isolate the sub feature covariance matrix, proceed with the sampling
if features_covariance is not None:
sub_feature_covariance = features_covariance[c][sub_emulator_columns].loc[c].loc[sub_emulator_columns].values
samples[c] = sample(emulator=sub_emulator,observed_feature=sub_feature,features_covariance=sub_feature_covariance,**kwargs)
else:
samples[c] = sample(emulator=sub_emulator,observed_feature=sub_feature,**kwargs)
#Done, return the sampled points
return samples
############################################################################################################################################
def set_to_model(self,parameters):
"""
Set the current model of the emulator to the one specified by the parameter set
:param parameters: parameters for which the feature will be emulated
:type parameters: array.
"""
#assert parameters.shape[0]==self.parameter_set.shape[1]
#if not hasattr(self,"_interpolator"):
# self.train()
#self._current_model_parameters = parameters
#self._current_predicted_feature = self.predict(parameters)
#self._current_interpolated_feature = interp1d(self.feature_label,self._current_predicted_feature)
raise NotImplementedError
def emulate(self,new_feature_label):
"""
Compute an emulated feature at the new feature label specified (multipoles, thresholds, ...) for the current model, using a linear interpolation between bins
:param new_feature_label: new feature label for which you want to emulate the feature
:type new_feature_label: array.
:returns: the emulated feature
"""
#return self._current_interpolated_feature(new_feature_label)
raise NotImplementedError
class EmulatorPanel(Panel):
@property
def _constructor_sliced(self):
return Emulator
#########################################################################################################################################################################################
###########################################################################
###########Hack to make scipy interpolate objects pickleable###############
###########################################################################
class _interpolate_wrapper(object):
def __init__(self,f,args,kwargs):
self.f = f
self.args = args
self.kwargs = kwargs
def __call__(self):
try:
return self.f(*self.args,**self.kwargs)
except:
import traceback
print("lenstools: Exception while building the interpolators")
print(" exception:")
traceback.print_exc()
raise