Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Log normal distribution #22 #214

Closed
wants to merge 9 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ __pycache__/*
.cache/*
.*.swp
*/.ipynb_checkpoints/*

*.pyc
# Project files
.ropeproject
.project
4 changes: 3 additions & 1 deletion skpro/distributions/__init__.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,9 @@
# copyright: skpro developers, BSD-3-Clause License (see LICENSE file)
# adapted from sktime

__all__ = ["Laplace", "Normal"]
__all__ = ["Log-Normal","Empirical", "Laplace", "Normal"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what state of the repository are you branching off from? This does not look like the most recent version, it looks like it is half a year old. Please make sure you update your fork regularly.


from skpro.distributions.empirical import Empirical
from skpro.distributions.laplace import Laplace
from skpro.distributions.normal import Normal
from skpro.distributions.log_normal import LogNormal
419 changes: 419 additions & 0 deletions skpro/distributions/empirical.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,419 @@
# -*- coding: utf-8 -*-
# copyright: skpro developers, BSD-3-Clause License (see LICENSE file)
"""Empirical distribution."""

__author__ = ["fkiraly"]

import numpy as np
import pandas as pd

from skpro.distributions.base import BaseDistribution


class Empirical(BaseDistribution):
"""Empirical distribution (sktime native).
Parameters
----------
spl : pd.DataFrame with pd.MultiIndex
empirical sample
last (highest) index is time, first (lowest) index is sample
weights : pd.Series, with same index and length as spl, optional, default=None
if not passed, ``spl`` is assumed to be unweighted
time_indep : bool, optional, default=True
if True, ``sample`` will sample individual time indices independently
if False, ``sample`` will sample etire instances from ``spl``
index : pd.Index, optional, default = RangeIndex
columns : pd.Index, optional, default = RangeIndex
Example
-------
>>> import pandas as pd
>>> from skpro.distributions.empirical import Empirical
>>> spl_idx = pd.MultiIndex.from_product(
... [[0, 1], [0, 1, 2]], names=["sample", "time"]
... )
>>> spl = pd.DataFrame(
... [[0, 1], [2, 3], [10, 11], [6, 7], [8, 9], [4, 5]],
... index=spl_idx,
... columns=["a", "b"],
... )
>>> dist = Empirical(spl)
>>> empirical_sample = dist.sample(3)
"""

_tags = {
"capabilities:approx": [],
"capabilities:exact": ["mean", "var", "energy", "cdf", "ppf"],
"distr:measuretype": "discrete",
}

def __init__(self, spl, weights=None, time_indep=True, index=None, columns=None):
self.spl = spl
self.weights = weights
self.time_indep = time_indep
self.index = index
self.columns = columns

_timestamps = spl.index.get_level_values(-1).unique()
_spl_instances = spl.index.droplevel(-1).unique()
self._timestamps = _timestamps
self._spl_instances = _spl_instances
self._N = len(_spl_instances)

if index is None:
index = pd.Index(_timestamps)

if columns is None:
columns = spl.columns

super().__init__(index=index, columns=columns)

# initialized sorted samples
self._init_sorted()

def _init_sorted(self):
"""Initialize sorted version of spl."""
times = self._timestamps
cols = self.columns

sorted = {}
weights = {}
weights
for t in times:
sorted[t] = {}
weights[t] = {}
for col in cols:
spl_t = self.spl.loc[(slice(None), t), col].values
sorter = np.argsort(spl_t)
spl_t_sorted = spl_t[sorter]
sorted[t][col] = spl_t_sorted
if self.weights is not None:
weights_t = self.weights.loc[(slice(None), t)].values
weights_t_sorted = weights_t[sorter]
weights[t][col] = weights_t_sorted
else:
ones = np.ones(len(spl_t_sorted))
weights[t][col] = ones

self._sorted = sorted
self._weights = weights

def _apply_per_ix(self, func, params, x=None):
"""Apply function per index."""
sorted = self._sorted
weights = self._weights

if x is not None and hasattr(x, "index"):
index = x.index
else:
index = self.index
if x is not None and hasattr(x, "columns"):
cols = x.columns
else:
cols = self.columns

res = pd.DataFrame(index=index, columns=cols)
for ix in index:
for col in cols:
spl_t = sorted[ix][col]
weights_t = weights[ix][col]
if x is None:
x_t = None
elif hasattr(x, "loc"):
x_t = x.loc[ix, col]
else:
x_t = x
res.loc[ix, col] = func(spl=spl_t, weights=weights_t, x=x_t, **params)
return res.convert_dtypes()

def energy(self, x=None):
r"""Energy of self, w.r.t. self or a constant frame x.
Let :math:`X, Y` be i.i.d. random variables with the distribution of `self`.
If `x` is `None`, returns :math:`\mathbb{E}[|X-Y|]` (per row), "self-energy".
If `x` is passed, returns :math:`\mathbb{E}[|X-x|]` (per row), "energy wrt x".
Parameters
----------
x : None or pd.DataFrame, optional, default=None
if pd.DataFrame, must have same rows and columns as `self`
Returns
-------
pd.DataFrame with same rows as `self`, single column `"energy"`
each row contains one float, self-energy/energy as described above.
"""
energy = self._apply_per_ix(_energy_np, {"assume_sorted": True}, x=x)
res = pd.DataFrame(energy.sum(axis=1), columns=["energy"])
return res

def mean(self):
r"""Return expected value of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns the expectation :math:`\mathbb{E}[X]`
Returns
-------
pd.DataFrame with same rows, columns as `self`
expected value of distribution (entry-wise)
"""
spl = self.spl
if self.weights is None:
mean_df = spl.groupby(level=-1).mean()
else:
mean_df = spl.groupby(level=-1).apply(
lambda x: np.average(x, weights=self.weights.loc[x.index], axis=0)
)
mean_df = pd.DataFrame(mean_df.tolist(), index=mean_df.index)
mean_df.columns = spl.columns

return mean_df

def var(self):
r"""Return element/entry-wise variance of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns :math:`\mathbb{V}[X] = \mathbb{E}\left(X - \mathbb{E}[X]\right)^2`
Returns
-------
pd.DataFrame with same rows, columns as `self`
variance of distribution (entry-wise)
"""
spl = self.spl
N = self._N
if self.weights is None:
var_df = spl.groupby(level=-1).var(ddof=0)
else:
mean = self.mean()
means = pd.concat([mean] * N, axis=0, keys=self._spl_instances)
var_df = spl.groupby(level=-1).apply(
lambda x: np.average(
(x - means.loc[x.index]) ** 2,
weights=self.weights.loc[x.index],
axis=0,
)
)
var_df = pd.DataFrame(
var_df.tolist(), index=var_df.index, columns=spl.columns
)
return var_df

def cdf(self, x):
"""Cumulative distribution function."""
cdf_val = self._apply_per_ix(_cdf_np, {"assume_sorted": True}, x=x)
return cdf_val

def ppf(self, p):
"""Quantile function = percent point function = inverse cdf."""
ppf_val = self._apply_per_ix(_ppf_np, {"assume_sorted": True}, x=p)
return ppf_val

def sample(self, n_samples=None):
"""Sample from the distribution.
Parameters
----------
n_samples : int, optional, default = None
Returns
-------
if `n_samples` is `None`:
returns a sample that contains a single sample from `self`,
in `pd.DataFrame` mtype format convention, with `index` and `columns` as `self`
if n_samples is `int`:
returns a `pd.DataFrame` that contains `n_samples` i.i.d. samples from `self`,
in `pd-multiindex` mtype format convention, with same `columns` as `self`,
and `MultiIndex` that is product of `RangeIndex(n_samples)` and `self.index`
"""
spl = self.spl
timestamps = self._timestamps
weights = self.weights

if n_samples is None:
n_samples = 1
n_samples_was_none = True
else:
n_samples_was_none = False
smpls = []

for _ in range(n_samples):
smpls_i = []
for t in timestamps:
spl_from = spl.loc[(slice(None), t), :]
if weights is not None:
spl_weights = weights.loc[(slice(None), t)].values
else:
spl_weights = None
spl_time = spl_from.sample(n=1, replace=True, weights=spl_weights)
spl_time = spl_time.droplevel(0)
smpls_i.append(spl_time)
spl_i = pd.concat(smpls_i, axis=0)
smpls.append(spl_i)

spl = pd.concat(smpls, axis=0, keys=range(n_samples))
if n_samples_was_none:
spl = spl.droplevel(0)

return spl

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator."""
# params1 is a DataFrame with simple row multiindex
spl_idx = pd.MultiIndex.from_product(
[[0, 1], [0, 1, 2]], names=["sample", "time"]
)
spl = pd.DataFrame(
[[0, 1], [2, 3], [10, 11], [6, 7], [8, 9], [4, 5]],
index=spl_idx,
columns=["a", "b"],
)
params1 = {
"spl": spl,
"weights": None,
"time_indep": True,
"index": pd.RangeIndex(3),
"columns": pd.Index(["a", "b"]),
}

# params2 is weighted
params2 = {
"spl": spl,
"weights": pd.Series([0.5, 0.5, 0.5, 1, 1, 1.1], index=spl_idx),
"time_indep": False,
"index": pd.RangeIndex(3),
"columns": pd.Index(["a", "b"]),
}
return [params1, params2]


def _energy_np(spl, x=None, weights=None, assume_sorted=False):
r"""Compute sample energy, fast numpy based subroutine.
Let :math:`X` be the random variable with support being
values of `spl`, with probability weights `weights`.
This function then returns :math:`\mathbb{E}[|X-Y|]`, with :math:`Y` an
independent copy of :math:`X`, if `x` is `None`.
If `x` is passed, returns :math:`\mathbb{E}[|X-x|]`.
Parameters
----------
spl : 1D np.ndarray
empirical sample
x : None or float, optional, default=None
if None, computes self-energy, if float, computes energy wrt x
weights : None or 1D np.ndarray, optional, default=None
if None, computes unweighted energy, if 1D np.ndarray, computes weighted energy
if not None, must be of same length as ``spl``, needs not be normalized
assume_sorted : bool, optional, default=False
if True, assumes that ``spl`` is sorted in ascending order
Returns
-------
float
energy as described above
"""
if weights is None:
weights = np.ones(len(spl))

if not assume_sorted:
sorter = np.argsort(spl)
spl = spl[sorter]
weights = weights[sorter]

w_sum = np.sum(weights)
weights = weights / w_sum

spl_diff = np.diff(spl)

if x is None:
cum_fwd = np.cumsum(weights[:-1])
cum_back = np.cumsum(weights[1::-1])[::-1]
energy = 2 * np.sum(cum_fwd * cum_back * spl_diff)
else:
spl_diff = np.abs(spl - x)
energy = np.sum(weights * spl_diff)

return energy


def _cdf_np(spl, x, weights=None, assume_sorted=False):
"""Compute empirical cdf, fast numpy based subroutine.
Parameters
----------
spl : 1D np.ndarray
empirical sample
x : float
value at which to evaluate cdf
weights : None or 1D np.ndarray, optional, default=None
if None, computes unweighted cdf, if 1D np.ndarray, computes weighted cdf
if not None, must be of same length as ``spl``, needs not be normalized
assume_sorted : bool, optional, default=False
if True, assumes that ``spl`` is sorted in ascending order
Returns
-------
cdf_val float
cdf-value at x
"""
if weights is None:
weights = np.ones(len(spl))

if not assume_sorted:
sorter = np.argsort(spl)
spl = spl[sorter]
weights = weights[sorter]

w_sum = np.sum(weights)
weights = weights / w_sum

weights_select = weights[spl <= x]
cdf_val = np.sum(weights_select)

return cdf_val


def _ppf_np(spl, x, weights=None, assume_sorted=False):
"""Compute empirical ppf, fast numpy based subroutine.
Parameters
----------
spl : 1D np.ndarray
empirical sample
x : float
probability at which to evaluate ppf
weights : None or 1D np.ndarray, optional, default=None
if None, computes unweighted ppf, if 1D np.ndarray, computes weighted ppf
if not None, must be of same length as ``spl``, needs not be normalized
assume_sorted : bool, optional, default=False
if True, assumes that ``spl`` is sorted in ascending order
Returns
-------
ppf_val float
ppf-value at p
"""
if weights is None:
weights = np.ones(len(spl))

if not assume_sorted:
sorter = np.argsort(spl)
spl = spl[sorter]
weights = weights[sorter]

w_sum = np.sum(weights)
weights = weights / w_sum

cum_weights = np.cumsum(weights)
ix_val = np.searchsorted(cum_weights, x)
ppf_val = spl[ix_val]

return ppf_val
4 changes: 2 additions & 2 deletions skpro/distributions/laplace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Normal/Gaussian probability distribution."""
# copyright: skpro developers, BSD-3-Clause License (see LICENSE file)
"""Laplace probability distribution."""

__author__ = ["fkiraly"]

178 changes: 178 additions & 0 deletions skpro/distributions/log_normal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Normal/Gaussian probability distribution."""

__author__ = ["fkiraly"]

import numpy as np
import pandas as pd
from math import exp
from scipy.special import erf, erfinv

from skpro.distributions.base import BaseDistribution


class LogNormal(BaseDistribution):
"""Log-Normal distribution (sktime native).
Parameters
----------
mean : float or array of float (1D or 2D)
mean of the normal distribution of the variable's natural logarithm
sd : float or array of float (1D or 2D), must be positive
standard deviation of the normal distribution of the logarithm of the distribution
index : pd.Index, optional, default = RangeIndex
columns : pd.Index, optional, default = RangeIndex
Example
-------
>>> from skpro.distributions.log_normal import Log_Normal
>>> n = LogNormal(mu=[[0, 1], [2, 3], [4, 5]], sigma=1)
"""

_tags = {
"capabilities:approx": ["pdflognorm"],
"capabilities:exact": ["mean", "var", "energy", "pdf", "log_pdf", "cdf", "ppf"],
"distr:measuretype": "continuous",
}

def __init__(self, mu, sigma, index=None, columns=None):

self.mu = mu
self.sigma = sigma
self.index = index
self.columns = columns

# todo: untangle index handling
# and broadcast of parameters.
# move this functionality to the base class
self._mu, self._sigma = self._get_bc_params()
shape = self._mu.shape

if index is None:
index = pd.RangeIndex(shape[0])

if columns is None:
columns = pd.RangeIndex(shape[1])

super(LogNormal, self).__init__(index=index, columns=columns)

def _get_bc_params(self):
"""Fully broadcast parameters of self, given param shapes and index, columns."""
to_broadcast = [self.mu, self.sigma]
if hasattr(self, "index") and self.index is not None:
to_broadcast += [self.index.to_numpy().reshape(-1, 1)]
if hasattr(self, "columns") and self.columns is not None:
to_broadcast += [self.columns.to_numpy()]
bc = np.broadcast_arrays(*to_broadcast)
return bc[0], bc[1]

def energy(self, x=None):
r"""Energy of self, w.r.t. self or a constant frame x.
Let :math:`X, Y` be i.i.d. random variables with the distribution of `self`.
If `x` is `None`, returns :math:`\mathbb{E}[|X-Y|]` (per row), "self-energy".
If `x` is passed, returns :math:`\mathbb{E}[|X-x|]` (per row), "energy wrt x".
Parameters
----------
x : None or pd.DataFrame, optional, default=None
if pd.DataFrame, must have same rows and columns as `self`
Returns
-------
pd.DataFrame with same rows as `self`, single column `"energy"`
each row contains one float, self-energy/energy as described above.
"""
approx_spl_size = self.get_tag("approx_energy_spl")
approx_method = (
"by approximating the energy expectation by the arithmetic mean of "
f"{approx_spl_size} samples"
)

# splx, sply = i.i.d. samples of X - Y of size N = approx_spl_size
N = approx_spl_size
if x is None:
splx = self.sample(N)
sply = self.sample(N)
# approx E[abs(X-Y)] via mean of samples of abs(X-Y) obtained from splx, sply
spl = splx - sply
energy = spl.apply(np.linalg.norm, axis=1, ord=1).groupby(level=1).mean()
energy = pd.DataFrame(energy, index=self.index, columns=["energy"])
else:
d = self.loc[x.index, x.columns]
mu_arr, sd_arr = d._mu, d._sigma

c_arr = x*(2*self.cdf(x)-1)-2*exp((mu_arr+sd_arr**2)/2)*(self.cdf((np.log(x)-mu_arr-sd_arr**2)/sd_arr)+self.cdf(sd_arr/mu_arr**0.5)-1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you kindly write down the formula in math, or explain otherwise how you are getting this expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, so this is basically the CRPS score which is E[IX-xI] -0.5E[|X-X'|] from which I was unable to isolate the first term. Wolfram alpha too wasnt able to produce a closed form for the integral. Can we change the description of the energy method accordingly or would you rather we go by the approximation in the base class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we try to spend a short discussion on trying to see whether we can get somewhere. Removing it entirely is always an option.

A "trick" to isolate the first term is to observe that adding the same constant to x and X (so, the location parameter) should leave the formula unchanged.

Regarding the integral, which integral concretely are you feeding into Wolfram Alpha?

Copy link
Contributor Author

@bhavikar04 bhavikar04 Mar 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes so this is what I got. If I try to add limits it crashes. Am I misinterpreting the integral?
Screenshot 2024-03-23 130300

Copy link
Collaborator

@fkiraly fkiraly Mar 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks correct. Now you need to add the limits. That should be an easy substitution, no? I recommend, do that manually. Use that

$\lim_{x\rightarrow -\infty} \mbox{erf}(x) = -1$, and $\lim_{x\rightarrow \infty} \mbox{erf}(x) = 1$. You need to be careful with the sign, but that should be it?

The number 0.707 etc should be $\frac{1}{2} \sqrt{2}$, but it doesn't matter for the limits.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved discussion here: #219

energy_arr = np.sum(c_arr, axis=1)
energy = pd.DataFrame(energy_arr, index=self.index, columns=["energy"])
return energy

def mean(self):
r"""Return expected value of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns the expectation :math:`\mathbb{E}[X]`
Returns
-------
pd.DataFrame with same rows, columns as `self`
expected value of distribution (entry-wise)
"""
mean_arr = np.exp(self._mu + self._sigma**2/2)
return pd.DataFrame(mean_arr, index=self.index, columns=self.columns)

def var(self):
r"""Return element/entry-wise variance of the distribution.
Let :math:`X` be a random variable with the distribution of `self`.
Returns :math:`\mathbb{V}[X] = \mathbb{E}\left(X - \mathbb{E}[X]\right)^2`
Returns
-------
pd.DataFrame with same rows, columns as `self`
variance of distribution (entry-wise)
"""
sd_arr = exp(2*self._mu + 2*(self._sigma)**2) - exp(2*self._mu + (self._sigma)**2)
return pd.DataFrame(sd_arr, index=self.index, columns=self.columns) ** 2

def pdf(self, x):
"""Probability density function."""
d = self.loc[x.index, x.columns]
pdf_arr = np.exp(-0.5 * ((np.log(x.values) - d.mu) / d.sigma) ** 2)
pdf_arr = pdf_arr / (x.values*d.sigma * np.sqrt(2 * np.pi))
return pd.DataFrame(pdf_arr, index=x.index, columns=x.columns)

def log_pdf(self, x):
"""Logarithmic probability density function."""
d = self.loc[x.index, x.columns]
lpdf_arr = -0.5 * ((np.log(x.values) - d.mu) / d.sigma) ** 2
lpdf_arr = lpdf_arr - np.log(x.values*d.sigma * np.sqrt(2 * np.pi))
return pd.DataFrame(lpdf_arr, index=x.index, columns=x.columns)

def cdf(self, x):
"""Cumulative distribution function."""
d = self.loc[x.index, x.columns]
cdf_arr = 0.5 + 0.5 * erf((np.log(x.values) - d.mu) / (d.sigma * np.sqrt(2)))
return pd.DataFrame(cdf_arr, index=x.index, columns=x.columns)

def ppf(self, p):
"""Quantile function = percent point function = inverse cdf."""
d = self.loc[p.index, p.columns]
icdf_arr = d.mu + d.sigma * np.sqrt(2) * erfinv(2 * p.values - 1)
icdf_arr = np.exp(self._sigma*icdf_arr)
return pd.DataFrame(icdf_arr, index=p.index, columns=p.columns)

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator."""
params1 = {"mu": [[0, 1], [2, 3], [4, 5]], "sigma": 1}
params2 = {
"mu": 0,
"sigma": 1,
"index": pd.Index([1, 2, 5]),
"columns": pd.Index(["a", "b"]),
}
return [params1, params2]