Skip to content

Commit

Permalink
Merge pull request #52 from simon-hirsch/cleanup
Browse files Browse the repository at this point in the history
Various cleanups and improvements.
  • Loading branch information
BerriJ authored Feb 21, 2025
2 parents 05fd2bb + 188e74a commit e7d8aed
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 115 deletions.
56 changes: 31 additions & 25 deletions src/rolch/distributions/gamma.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Tuple

import numpy as np
import scipy.special as spc
import scipy.stats as st
Expand Down Expand Up @@ -27,7 +29,6 @@ class DistributionGamma(Distribution):
$$
f(x, \\alpha, \\beta) = \\frac{\\beta^\\alpha x^{\\alpha - 1} \exp[-\\beta x]}{\Gamma(\\alpha)}
$$
with the paramters $\\alpha, \\beta >0$. The parameters can be mapped as follows:
$$
\\alpha = 1/\sigma^2 \Leftrightarrow \sigma = \sqrt{1 / \\alpha}
Expand All @@ -37,31 +38,32 @@ class DistributionGamma(Distribution):
\\beta = 1/(\sigma^2\mu).
$$
Args:
loc_link (LinkFunction, optional): The link function for $\mu$. Defaults to LogLink().
scale_link (LinkFunction, optional): The link function for $\sigma$. Defaults to LogLink().
"""

def __init__(
self, loc_link: LinkFunction = LogLink(), scale_link: LinkFunction = LogLink()
):
self.loc_link = loc_link
self.scale_link = scale_link
# Set up links as dict
self.links = {0: self.loc_link, 1: self.scale_link}
# Set distribution params
self.n_params = 2
self.corresponding_gamlss = "GA"
self.scipy_dist = st.gamma

def theta_to_params(self, theta):
self,
loc_link: LinkFunction = LogLink(),
scale_link: LinkFunction = LogLink(),
) -> None:
self.loc_link: LinkFunction = loc_link
self.scale_link: LinkFunction = scale_link
self.links: dict[int, LinkFunction] = {0: self.loc_link, 1: self.scale_link}
self.n_params: int = 2
self.corresponding_gamlss: str = "GA"
self.scipy_dist: st.rv_continuous = st.gamma

def theta_to_params(self, theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
mu = theta[:, 0]
sigma = theta[:, 1]
return mu, sigma

@staticmethod
def gamlss_to_scipy(mu: np.ndarray, sigma: np.ndarray):
def gamlss_to_scipy(
mu: np.ndarray, sigma: np.ndarray
) -> Tuple[np.ndarray, int, np.ndarray]:
"""Map GAMLSS Parameters to scipy parameters.
Args:
Expand All @@ -77,7 +79,7 @@ def gamlss_to_scipy(mu: np.ndarray, sigma: np.ndarray):
scale = 1 / beta
return alpha, loc, scale

def dl1_dp1(self, y, theta, param=0):
def dl1_dp1(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)

if param == 0:
Expand All @@ -93,7 +95,7 @@ def dl1_dp1(self, y, theta, param=0):
+ spc.digamma(1 / (sigma**2))
)

def dl2_dp2(self, y, theta, param=0):
def dl2_dp2(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
if param == 0:
# MU
Expand All @@ -103,14 +105,16 @@ def dl2_dp2(self, y, theta, param=0):
# SIGMA
return (4 / sigma**4) - (4 / sigma**6) * spc.polygamma(1, (1 / sigma**2))

def dl2_dpp(self, y, theta, params=(0, 1)):
def dl2_dpp(
self, y: np.ndarray, theta: np.ndarray, params: Tuple[int, int] = (0, 1)
) -> np.ndarray:
if sorted(params) == [0, 1]:
return np.zeros_like(y)

def link_function(self, y, param=0):
def link_function(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].link(y)

def link_inverse(self, y, param=0):
def link_inverse(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse(y)

def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
Expand All @@ -119,25 +123,27 @@ def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
def link_inverse_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse_derivative(y)

def initial_values(self, y, param=0, axis=None):
def initial_values(
self, y: np.ndarray, param: int = 0, axis: int = None
) -> np.ndarray:
if param == 0:
return np.repeat(np.mean(y, axis=None), y.shape[0])
if param == 1:
return np.ones_like(y)

def cdf(self, y, theta):
def cdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return self.scipy_dist(*self.gamlss_to_scipy(mu, sigma)).cdf(y)

def pdf(self, y, theta):
def pdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return self.scipy_dist(*self.gamlss_to_scipy(mu, sigma)).pdf(y)

def ppf(self, q, theta):
def ppf(self, q: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return self.scipy_dist(*self.gamlss_to_scipy(mu, sigma)).ppf(q)

def rvs(self, size, theta):
def rvs(self, size: int, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return (
self.scipy_dist(*self.gamlss_to_scipy(mu, sigma))
Expand Down
51 changes: 28 additions & 23 deletions src/rolch/distributions/johnsonsu.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Tuple

import numpy as np
import scipy.stats as st

from rolch.base import Distribution
from rolch.base import Distribution, LinkFunction
from rolch.link import IdentityLink, LogLink


Expand All @@ -18,11 +20,11 @@ class DistributionJSU(Distribution):

def __init__(
self,
loc_link=IdentityLink(),
scale_link=LogLink(),
shape_link=IdentityLink(),
tail_link=LogLink(),
):
loc_link: LinkFunction = IdentityLink(),
scale_link: LinkFunction = LogLink(),
shape_link: LinkFunction = IdentityLink(),
tail_link: LinkFunction = LogLink(),
) -> None:
self.n_params = 4
self.loc_link = loc_link
self.scale_link = scale_link
Expand All @@ -35,14 +37,16 @@ def __init__(
self.tail_link, # tail
]

def theta_to_params(self, theta):
def theta_to_params(
self, theta: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
mu = theta[:, 0]
sigma = theta[:, 1]
nu = theta[:, 2]
tau = theta[:, 3]
return mu, sigma, nu, tau

def dl1_dp1(self, y, theta, param=0):
def dl1_dp1(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)

if param == 0:
Expand Down Expand Up @@ -77,7 +81,7 @@ def dl1_dp1(self, y, theta, param=0):
dldt = (1 + r * nu - r * r) / tau
return dldt

def dl2_dp2(self, y, theta, param=0):
def dl2_dp2(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
if param == 0:
# MU
Expand All @@ -87,7 +91,6 @@ def dl2_dp2(self, y, theta, param=0):
(r * tau) / (sigma * np.sqrt(z * z + 1))
)
d2ldm2 = -dldm * dldm
# d2ldm2 = np.max(1e-15, d2ldm2)
return d2ldm2

if param == 1:
Expand All @@ -98,25 +101,25 @@ def dl2_dp2(self, y, theta, param=0):
(r * tau * z) / (sigma * np.sqrt(z * z + 1))
)
d2ldd2 = -(dldd * dldd)
# d2ldd2 = np.max(d2ldd2, -1e-15)
return d2ldd2

if param == 2:
# TAIL
z = (y - mu) / sigma
r = nu + tau * np.arcsinh(z)
d2ldv2 = -(r * r)
# d2ldv2 = np.max(d2ldv2 < -1e-15)
return d2ldv2

if param == 3:
z = (y - mu) / sigma
r = nu + tau * np.arcsinh(z)
dldt = (1 + r * nu - r * r) / tau
d2ldt2 = -dldt * dldt
# d2ldt2 = np.max(d2ldt2, -1e-15)
return d2ldt2

def dl2_dpp(self, y, theta, params=(0, 1)):
def dl2_dpp(
self, y: np.ndarray, theta: np.ndarray, params: Tuple[int, int] = (0, 1)
) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
if sorted(params) == [0, 1]:
z = (y - mu) / sigma
Expand Down Expand Up @@ -178,10 +181,10 @@ def dl2_dpp(self, y, theta, params=(0, 1)):
d2ldvdt = -(dldv * dldt)
return d2ldvdt

def link_function(self, y, param=0):
def link_function(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].link(y)

def link_inverse(self, y, param=0):
def link_inverse(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse(y)

def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
Expand All @@ -190,17 +193,19 @@ def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
def link_inverse_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse_derivative(y)

def initial_values(self, y, param=0, axis=None):
def initial_values(
self, y: np.ndarray, param: int = 0, axis: int = None
) -> np.ndarray:
if param == 0:
return np.repeat(np.mean(y, axis=None), y.shape[0])
return np.repeat(np.mean(y, axis=axis), y.shape[0])
if param == 1:
return np.repeat(np.std(y, axis=None), y.shape[0])
return np.repeat(np.std(y, axis=axis), y.shape[0])
if param == 2:
return np.full_like(y, 0)
if param == 3:
return np.full_like(y, 10)

def cdf(self, y, theta):
def cdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
return st.johnsonsu(
loc=mu,
Expand All @@ -209,7 +214,7 @@ def cdf(self, y, theta):
b=tau,
).cdf(y)

def pdf(self, y, theta):
def pdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
return st.johnsonsu(
loc=mu,
Expand All @@ -218,7 +223,7 @@ def pdf(self, y, theta):
b=tau,
).pdf(y)

def ppf(self, q, theta):
def ppf(self, q: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
return st.johnsonsu(
loc=mu,
Expand All @@ -227,7 +232,7 @@ def ppf(self, q, theta):
b=tau,
).ppf(q)

def rvs(self, size, theta):
def rvs(self, size: int, theta: np.ndarray) -> np.ndarray:
mu, sigma, nu, tau = self.theta_to_params(theta)
return (
st.johnsonsu(
Expand Down
53 changes: 31 additions & 22 deletions src/rolch/distributions/normal.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from typing import Tuple, Union

import numpy as np
import scipy.stats as st

from rolch.base import Distribution
from rolch.base import Distribution, LinkFunction
from rolch.link import IdentityLink, LogLink


class DistributionNormal(Distribution):
"""Corresponds to GAMLSS NO() and scipy.stats.norm()"""

def __init__(self, loc_link=IdentityLink(), scale_link=LogLink()):
self.n_params = 2
self.loc_link = loc_link
self.scale_link = scale_link
self.links = [self.loc_link, self.scale_link]

def theta_to_params(self, theta):
def __init__(
self,
loc_link: LinkFunction = IdentityLink(),
scale_link: LinkFunction = LogLink(),
) -> None:
self.n_params: int = 2
self.loc_link: LinkFunction = loc_link
self.scale_link: LinkFunction = scale_link
self.links: list[LinkFunction] = [self.loc_link, self.scale_link]

def theta_to_params(self, theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
mu = theta[:, 0]
sigma = theta[:, 1]
return mu, sigma

def dl1_dp1(self, y, theta, param=0):
def dl1_dp1(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)

if param == 0:
Expand All @@ -28,8 +34,8 @@ def dl1_dp1(self, y, theta, param=0):
if param == 1:
return ((y - mu) ** 2 - sigma**2) / (sigma**3)

def dl2_dp2(self, y, theta, param=0):
mu, sigma = self.theta_to_params(theta)
def dl2_dp2(self, y: np.ndarray, theta: np.ndarray, param: int = 0) -> np.ndarray:
_, sigma = self.theta_to_params(theta)
if param == 0:
# MU
return -(1 / sigma**2)
Expand All @@ -38,15 +44,16 @@ def dl2_dp2(self, y, theta, param=0):
# SIGMA
return -(2 / (sigma**2))

def dl2_dpp(self, y, theta, params=(0, 1)):
mu, sigma = self.theta_to_params(theta)
def dl2_dpp(
self, y: np.ndarray, theta: np.ndarray, params: Tuple[int, int] = (0, 1)
) -> np.ndarray:
if sorted(params) == [0, 1]:
return np.zeros_like(y)

def link_function(self, y, param=0):
def link_function(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].link(y)

def link_inverse(self, y, param=0):
def link_inverse(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse(y)

def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
Expand All @@ -55,24 +62,26 @@ def link_function_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
def link_inverse_derivative(self, y: np.ndarray, param: int = 0) -> np.ndarray:
return self.links[param].inverse_derivative(y)

def initial_values(self, y, param=0, axis=None):
def initial_values(
self, y: np.ndarray, param: int = 0, axis: Union[int, None] = None
) -> np.ndarray:
if param == 0:
return np.repeat(np.mean(y, axis=None), y.shape[0])
return np.repeat(np.mean(y, axis=axis), y.shape[0])
if param == 1:
return np.repeat(np.std(y, axis=None), y.shape[0])
return np.repeat(np.std(y, axis=axis), y.shape[0])

def cdf(self, y, theta):
def cdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return st.norm(mu, sigma).cdf(y)

def pdf(self, y, theta):
def pdf(self, y: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return st.norm(mu, sigma).pdf(y)

def ppf(self, q, theta):
def ppf(self, q: np.ndarray, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return st.norm(mu, sigma).ppf(q)

def rvs(self, size, theta):
def rvs(self, size: int, theta: np.ndarray) -> np.ndarray:
mu, sigma = self.theta_to_params(theta)
return st.norm(mu, sigma).rvs((size, theta.shape[0])).T
Loading

0 comments on commit e7d8aed

Please sign in to comment.