Skip to content

Commit

Permalink
Implement AIC for maxlike
Browse files Browse the repository at this point in the history
  • Loading branch information
ppegolo committed Jun 23, 2024
1 parent a30c535 commit 2a3a7d3
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 180 deletions.
244 changes: 161 additions & 83 deletions examples/06_example_maxlike.ipynb

Large diffs are not rendered by default.

61 changes: 51 additions & 10 deletions sportran/current/current.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sportran.utils import log
from sportran.plotter.current import CurrentPlotter
import warnings
from copy import deepcopy

__all__ = ['Current']

Expand Down Expand Up @@ -399,10 +400,11 @@ def bayesian_analysis(self, model, n_parameters,

def maxlike_estimate(self,
model,
n_parameters,
n_parameters = 'AIC',
mask = None,
likelihood = 'wishart',
solver = 'BFGS'
solver = 'BFGS',
guess_runave_window = 50,
):

if likelihood.lower() != 'wishart':
Expand All @@ -411,15 +413,54 @@ def maxlike_estimate(self,
if likelihood.lower() == 'wishart':
data = self.cospectrum.real # TODO figure out this business of real vs abs value

self.maxlike = MaxLikeFilter(data,
model,
n_parameters,
self.N_EQUIV_COMPONENTS,
mask = mask)
# Define MaxLikeFilter object
self.maxlike = MaxLikeFilter()

if isinstance(n_parameters, int):
do_AIC = False
# Minimize the negative log-likelihood with a fixed number of parameters
self.maxlike.maxlike(model = model,
data = data,
mask = mask,
n_components = self.N_EQUIV_COMPONENTS,
guess_runave_window = guess_runave_window,
n_parameters = n_parameters,
likelihood = likelihood,
solver = solver)

elif isinstance(n_parameters, list) or isinstance(n_parameters, np.ndarray):
do_AIC = True
assert np.issubdtype(np.asarray(n_parameters).dtype, np.integer), "`n_parameter` must be an integer array-like"
log.write_log(f'Optimal number of parameters between {np.min(n_parameters)} and {np.max(n_parameters)} chosen by AIC')

elif (isinstance(n_parameters, str) and n_parameters.lower() == 'aic'):
do_AIC = True
n_parameters = np.arange(3, 40)
log.write_log('Optimal number of parameters between 3 and 40 chosen by AIC')

if do_AIC:
# Minimize the negative log-likelihood on a range of parameters and choose the best one with the AIC
_aic = []
_filters = []
for n_par in n_parameters:
log.write_log(f'n_parameters = {n_par}')
self.maxlike.maxlike(model = model,
data = data,
mask = mask,
n_components = self.N_EQUIV_COMPONENTS,
guess_runave_window = guess_runave_window,
n_parameters = n_par,
likelihood = likelihood,
solver = solver,
write_log = False)
_aic.append(self.maxlike.log_likelihood_value - n_par)
_filters.append(deepcopy(self.maxlike))
self.optimal_nparameters = n_parameters[np.argmax(_aic)]
self.maxlike = _filters[np.argmax(_aic)]
self.aic = np.max(_aic)
del _filters
del _aic

# TODO: go on from here
self.maxlike.maxlike(likelihood = likelihood, solver = solver)

# self.estimate = self.maxlike.parameters_mean[0]*self.maxlike.factor

# try:
Expand Down
178 changes: 91 additions & 87 deletions sportran/md/maxlike.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,10 @@ class MaxLikeFilter(object):
TODO
"""

def __init__(self, data, model, n_parameters, n_components, mask = None):

assert isinstance(data, np.ndarray), "Noisy data should be numpy.ndarray"
assert data.shape[0] == data.shape[1] and len(data.shape)==3, 'Noisy data should be a 2x2xN numpy.ndarray'

self.data = data
self.model = model
self.mask = mask
self.n_components = n_components
self.n_parameters = n_parameters

def __init__(self):
# TODO: it's likely better to define some quantities here rather than in self.maxlike
log.write_log("MaxLikeFilter")

def __repr__(self):
msg = 'MaxLikeFilter:\n' #+ \
# ' AIC type = {:}\n'.format(self.aic_type) + \
Expand All @@ -69,114 +62,125 @@ def __repr__(self):

def maxlike(
self,
data = None,
model = None,
n_parameters = None,
likelihood = None,
solver = None,
mask = None,
guess_runave_window = 50
n_components = None,
guess_runave_window = 50,
omega_fixed = None,
write_log = True
):

def log_likelihood_wishart(w, model, omega, omega_fixed, data_, nu, ell):
'''
Logarithm of the Wishart probability density function.
'''
n = ell
p = nu
multig = multigammaln(0.5*n, p)

# Compute scale matrix from the model (symmetrize to ensure positive definiteness)
spline = model(omega_fixed, w)
V = spline(omega)
V = opt_einsum.contract('wba,wbc->wac', V, V) / n # equiv to V.T@V for each frequency

# The argument of the PDF is the data
X = data_

# Determinant of X
a, b, d = X[...,0,0], X[...,0,1], X[...,1,1]
detX = a*d - b**2

# Determinant and inverse of V
a, b, d = V[...,0,0], V[...,0,1], V[...,1,1]
invV = (1/(a*d - b**2)*np.array([[d, -b],[-b, a]])).transpose(2,0,1)
detV = a*d - b**2

# Trace of the matrix product between the inverse of V and X
trinvV_X = opt_einsum.contract('wab,wba->w', invV, X)

# if detV.min() < 0 or detX.min() < 0:
# print(detV.min(), detX.min())

# Sum pieces of the log-likelihood
log_pdf = - (0.5*(-n*p*LOG2 - n*np.log(detV) + (n-p-1)*np.log(detX) - trinvV_X) - multig)

return np.sum(log_pdf)


if n_parameters is None:
n_parameters = self.n_parameters
# Define internal variables for consistent notation
assert n_parameters is not None
self.n_parameters = n_parameters

assert likelihood is not None
assert solver is not None

# Define internal variables for consistent notation
if write_log:
log.write_log('Maximum-likelihood estimate with {} parameters'.format(n_parameters))

nu = 2
ell = self.n_components
data = self.data
omega = np.arange(data.shape[-1])
args = np.int32(np.linspace(0, data.shape[-1] - 1, n_parameters, endpoint = True))
omega_fixed = omega[args]
self.omega = omega
self.omega_fixed = omega_fixed
model = self.model
ell = n_components

if likelihood.lower() == 'wishart':
assert likelihood is not None
likelihood = likelihood.lower()
if likelihood == 'wishart':
log_like = self.log_likelihood_wishart
elif likelihood == 'chisquare' or likelihood == 'chisquared':
log_like = self.log_likelihood_diag
elif likelihood == 'variancegamma' or likelihood == 'variance-gamma':
log_like = self.log_likelihood_offdiag
else:
raise NotImplementedError("Currently implemented likelihoods are: `wishart`, `chisquare`, `variance-gamma`")

assert data is not None, "`data` must be provided"
if mask is not None:
self.data = data[mask]
else:
self.data = data

# Check data is consistently shaped
assert len(data.shape) == 1 or len(data.shape) == 3, "`data` should be a 1d array (diagonal or off-diagonal estimates) or a 3d array (Wishart matrix estimate)"
if len(data.shape) == 3:
assert likelihood == 'wishart', f"Misshaped `data` for {likelihood} likelihood."
assert data.shape[0] == data.shape[1] == 2, "`data` for a Wishart esimate must be a (2,2,N) array."
else:
assert likelihood != 'wishart', f"Misshaped `data` for {likelihood} likelihood."

# Define initial guess for the optimization
try:
guess_data = np.array([runavefilter(c, guess_runave_window) for c in data.reshape(-1,data.shape[-1])]).reshape(data.shape)
#TODO: worth using pandas or irrelevant?
# np.array([pd.Series(data_wishart[:,i]).rolling(window=50,
# closed = 'left',
# min_periods = 0,
# center = True).mean().to_numpy() for i in range(4)]).T.reshape(-1, 2, 2)
except:
log.write_log(f'Guessing data with a running average with a {guess_runave_window} large window failed. Window changed to 10.')
guess_runave_window = 10
guess_data = np.array([runavefilter(c, guess_runave_window) for c in data.reshape(-1,data.shape[-1])]).reshape(data.shape)
# TODO: a lot of massaging: simplify?
guess_data = np.array([guess_data[:,:,j] for j in [np.argmin(np.abs(omega-omega_fixed[i])) for i in range(len(omega_fixed))]])
guess_data = np.array([cholesky(g, lower = False) for g in guess_data])/np.sqrt(ell)
guess_data = np.array([guess_data[:,0,0], guess_data[:,0,1], guess_data[:,1,1]]).reshape(-1)
# Frequency
omega = np.arange(data.shape[-1])
self.omega = omega

data = data.transpose(2, 0, 1)
# Spline nodes
if omega_fixed is None:
if write_log:
log.write_log("Spline nodes are equispaced from 0 to the Nyquist frequency.")
args = np.int32(np.linspace(0, data.shape[-1] - 1, n_parameters, endpoint = True))
omega_fixed = omega[args]
self.omega_fixed = omega_fixed

log.write_log('Maximum-likelihood estimate with {} parameters'.format(n_parameters))
# Spline model
assert model is not None
self.model = model

res = minimize(fun = log_likelihood_wishart,
# fun = lambda w, _model, _omega, _omega_fixed, _data, _nu, _ell: -log_like(w, _model, _omega, _omega_fixed, _data, _nu, _ell),
# Define initial guess for the optimization
guess_data = self.guess_data(data, omega, omega_fixed, ell, window = guess_runave_window)
data = np.moveaxis(data, data.shape.index(max(data.shape)), 0)

# Minimize the negative log-likelihood
res = minimize(fun = log_like,
x0 = guess_data,
args = (model, omega, omega_fixed, data, nu, ell),
method = solver)

# Evaluate the covariance of the parameters, if the selected solver allows it
try:
cov = res.hess_inv
log.write_log(f'The {solver} solver features the calculation of the Hessian. The covariance matrix will be estimated through the Laplace approximation.')
if write_log:
log.write_log(f'The {solver} solver features the calculation of the Hessian. The covariance matrix will be estimated through the Laplace approximation.')
except:
log.write_log(f'The {solver} solver does not feature the calculation of the Hessian. No covariance matrix will be output.')
if write_log:
log.write_log(f'The {solver} solver does not feature the calculation of the Hessian. No covariance matrix will be output.')
cov = None

self.parameters_mean = res.x
if cov is not None:
self.parameters_std = cov.diagonal()**0.5
self.data = data
self.parameters_std = np.sqrt(cov.diagonal())

self.optimizer_res = res
self.log_likelihood_value = -log_like(res.x, model, omega, omega_fixed, data, nu, ell)

################################################

# Helper functions

def guess_data(self, data, omega, omega_fixed, ell, window = 10, loglike = 'wishart'):
'''
Moving average of the input data as initial guess for the parameter esimation
'''

shape = data.shape

try:
guess_data = np.array([runavefilter(c, window) for c in data.reshape(-1, shape[-1])]).reshape(shape)
except:
log.write_log(f'Guessing data with a running average with a {window} large window failed. Window changed to 10.')
window = 10
guess_data = np.array([runavefilter(c, window) for c in data.reshape(-1, shape[-1])]).reshape(shape)

guess_data = np.array([guess_data[...,j] for j in [np.argmin(np.abs(omega - omega_fixed[i])) for i in range(len(omega_fixed))]])

if loglike == 'wishart':
guess_data = np.array([cholesky(g, lower = False) for g in guess_data]) / np.sqrt(ell)
guess_data = np.array([guess_data[:, 0, 0], guess_data[:, 0, 1], guess_data[:, 1, 1]]).reshape(-1)

return guess_data

def log_likelihood_wishart(w, model, omega, omega_fixed, data_, nu, ell):
def log_likelihood_wishart(self, w, model, omega, omega_fixed, data_, nu, ell):
'''
Logarithm of the Wishart probability density function.
'''
Expand Down

0 comments on commit 2a3a7d3

Please sign in to comment.