# MIT License: Copyright (c) 2021 Lorenzo Loconte, Gennaro Gala
import abc
from enum import Enum
from typing import Optional, Union, List
import numpy as np
import scipy.stats as ss
from deeprob.utils.data import check_data_dtype
from deeprob.spn.structure.node import Node
[docs]class LeafType(Enum):
"""
The type of the distribution leaf.
It can be either discrete or continuous.
"""
DISCRETE = 1
CONTINUOUS = 2
[docs]class Leaf(Node):
LEAF_TYPE = None
def __init__(self, scope: Union[int, List[int]]):
"""
Initialize a leaf node given its scope.
:param scope: The scope of the leaf.
:param kwargs: Additional arguments.
"""
super().__init__([scope] if isinstance(scope, int) else scope)
[docs] @abc.abstractmethod
def em_init(self, random_state: np.random.RandomState):
"""
Random initialize the leaf's parameters for Expectation-Maximization (EM).
:param random_state: The random state.
"""
[docs] @abc.abstractmethod
def em_step(self, stats: np.ndarray, data: np.ndarray, step_size: float):
"""
Compute a batch Expectation-Maximization (EM) step.
:param stats: The sufficient statistics of each sample.
:param data: The data regarding random variables of the leaf.
:param step_size: The step size of update.
"""
[docs] @abc.abstractmethod
def fit(self, data: np.ndarray, domain: Union[list, tuple], **kwargs):
"""
Fit the distribution parameters given the domain and some training data.
:param data: The training data.
:param domain: The domain of the distribution leaf.
:param kwargs: Optional parameters.
:raises ValueError: If a parameter is out of domain.
"""
[docs] @abc.abstractmethod
def likelihood(self, x: np.ndarray) -> np.ndarray:
"""
Compute the likelihood of the distribution leaf given some input.
:param x: The inputs.
:return: The resulting likelihoods.
"""
[docs] @abc.abstractmethod
def log_likelihood(self, x: np.ndarray) -> np.ndarray:
"""
Compute the logarithmic likelihood of the distribution leaf given some input.
:param x: The inputs.
:return: The resulting log-likelihoods.
"""
[docs] @abc.abstractmethod
def mpe(self, x: np.ndarray) -> np.ndarray:
"""
Compute the maximum at posteriori values.
:param x: The inputs.
:return: The distribution's maximum at posteriori values.
"""
[docs] @abc.abstractmethod
def sample(self, x: np.ndarray) -> np.ndarray:
"""
Sample from the leaf distribution.
:param x: The samples with possible NaN values.
:return: The completed samples.
"""
[docs] @abc.abstractmethod
def moment(self, k: int = 1) -> float:
"""
Compute the moment of a given order.
:param k: The order of the moment.
:return: The moment of order k.
"""
[docs] @abc.abstractmethod
def params_count(self) -> int:
"""
Get the number of parameters of the distribution leaf.
:return: The number of parameters.
"""
[docs] @abc.abstractmethod
def params_dict(self) -> dict:
"""
Get a dictionary representation of the distribution parameters.
:return: A dictionary containing the distribution parameters.
"""
[docs]class Bernoulli(Leaf):
LEAF_TYPE = LeafType.DISCRETE
def __init__(self, scope: int, p: float = 0.5):
"""
Initialize a Bernoulli leaf node given its scope.
:param scope: The scope of the leaf.
:param p: The Bernoulli probability.
:raises ValueError: If a parameter is out of domain.
"""
super().__init__(scope)
if p < 0.0 or p > 1.0:
raise ValueError("The Bernoulli probability must be in [0, 1]")
self.p = p
[docs] def fit(self, data: np.ndarray, domain: list, alpha: float = 0.1, **kwargs):
"""
Fit the distribution parameters given the domain and some training data.
:param data: The training data.
:param domain: The domain of the distribution leaf.
:param alpha: The Laplace smoothing factor.
:param kwargs: Optional parameters.
:raises ValueError: If a parameter is out of domain.
"""
if domain != [0, 1]:
raise ValueError("The domain must be binary for a Bernoulli distribution")
if alpha < 0.0:
raise ValueError("The Laplace smoothing factor must be non-negative")
# Check the data dtype
data = check_data_dtype(data, dtype=np.float32)
# Estimate using Laplace smoothing
self.p = (np.sum(data) + alpha) / (len(data) + 2 * alpha)
[docs] def em_init(self, random_state: np.random.RandomState):
self.p = random_state.rand()
[docs] def em_step(self, stats: np.ndarray, data: np.ndarray, step_size: float):
alpha = np.finfo(np.float16).eps # Use a very small Laplace smoothing factor
data = np.squeeze(data, axis=1)
total_stats = np.sum(stats)
p = (np.dot(stats, data) + alpha) / (total_stats + 2 * alpha)
# Update the parameters
self.p = (1.0 - step_size) * self.p + step_size * p
[docs] def likelihood(self, x: np.ndarray) -> np.ndarray:
ls = np.ones([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
ls[~mask] = ss.bernoulli.pmf(x[~mask], self.p)
return ls
[docs] def log_likelihood(self, x: np.ndarray) -> np.ndarray:
lls = np.zeros([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
lls[~mask] = ss.bernoulli.logpmf(x[~mask], self.p)
return lls
[docs] def mpe(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = 0 if self.p < 0.5 else 1
return x
[docs] def sample(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = ss.bernoulli.rvs(self.p, size=np.count_nonzero(mask))
return x
[docs] def moment(self, k: int = 1) -> float:
return ss.bernoulli.moment(k, self.p)
[docs] def params_count(self):
return 1
[docs] def params_dict(self):
return {'p': self.p}
[docs]class Categorical(Leaf):
LEAF_TYPE = LeafType.DISCRETE
def __init__(
self,
scope: int,
categories: Optional[Union[List, np.ndarray]] = None,
probabilities: Optional[Union[List, np.ndarray]] = None
):
"""
Initialize a Categorical leaf node given its scope.
:param scope: The scope of the leaf.
:param categories: The possible categories.
:param probabilities: The probabilities associated to each category.
"""
super().__init__(scope)
self.categories = None
self.probabilities = None
self.distribution = None
if categories is not None and probabilities is not None:
if len(categories) != len(probabilities):
raise ValueError("Each category must be associated a probability")
if not np.isclose(np.sum(probabilities), 1.0):
raise ValueError("Probabilities parameter must sum up to 1")
if isinstance(categories, list):
categories = np.array(categories, np.int64)
if isinstance(probabilities, list):
probabilities = np.array(probabilities, np.float32)
self.categories = np.array(categories, np.int64)
self.probabilities = np.array(probabilities, np.float32)
self.distribution = ss.rv_discrete(values=(self.categories, self.probabilities))
elif categories is not None or probabilities is not None:
raise ValueError("Partial defined parameters (categories, probabilities) are not handled")
[docs] def fit(self, data: np.ndarray, domain: list, alpha: float = 0.1, **kwargs):
"""
Fit the distribution parameters given the domain and some training data.
:param data: The training data.
:param domain: The domain of the distribution leaf.
:param alpha: The Laplace smoothing factor.
:param kwargs: Optional parameters.
:raises ValueError: If a parameter is out of domain.
"""
if not isinstance(domain, list):
raise ValueError("The domain must be categorical for a Categorical distribution")
if alpha < 0.0:
raise ValueError("The Laplace smoothing factor must be non-negative")
self.probabilities = np.empty(len(domain), np.float32)
for i, d in enumerate(domain):
self.probabilities[i] = (len(data[data == d]) + alpha) / (len(data) + len(domain) * alpha)
self.categories = np.array(domain, np.int64)
self.distribution = ss.rv_discrete(values=(self.categories, self.probabilities))
[docs] def em_init(self, random_state: np.random.RandomState):
"""
Random initialize the leaf's parameters for Expectation-Maximization (EM).
:param random_state: The random state.
:raises ValueError: If the categories are not initialized.
"""
if self.categories is None:
raise ValueError("Categorical leaf distribution is not initialized")
# Initialize the categories probabilities using a dirichlet distribution
self.probabilities = random_state.dirichlet(np.ones(len(self.categories)))
self.distribution = ss.rv_discrete(values=(self.categories, self.probabilities))
[docs] def em_step(self, stats: np.ndarray, data: np.ndarray, step_size: float):
alpha = np.finfo(np.float16).eps # Use a very small Laplace smoothing factor
data = np.squeeze(data, axis=1)
total_stats = np.sum(stats)
# Compute the probabilities for each category
probabilities = np.empty(len(self.categories), np.float32)
for i, d in enumerate(self.categories):
probabilities[i] = (np.sum(stats[data == d]) + alpha) / (total_stats + len(self.categories) * alpha)
# Update the parameters
self.probabilities = (1.0 - step_size) * self.probabilities + step_size * probabilities
self.distribution = ss.rv_discrete(values=(self.categories, self.probabilities))
[docs] def likelihood(self, x: np.ndarray) -> np.ndarray:
ls = np.ones([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
ls[~mask] = self.distribution.pmf(x[~mask].astype(np.int64, copy=False))
return ls
[docs] def log_likelihood(self, x: np.ndarray) -> np.ndarray:
lls = np.zeros([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
lls[~mask] = self.distribution.logpmf(x[~mask].astype(np.int64, copy=False))
return lls
[docs] def mpe(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = self.categories[self.probabilities.argmax()]
return x
[docs] def sample(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = self.distribution.rvs(size=np.count_nonzero(mask))
return x
[docs] def moment(self, k: int = 1) -> float:
return self.distribution.moment(k)
[docs] def params_count(self) -> int:
return 2 * len(self.categories)
[docs] def params_dict(self) -> dict:
if self.distribution is None:
return {'categories': None, 'probabilities': None}
return {'categories': self.categories, 'probabilities': self.probabilities}
[docs]class Isotonic(Leaf):
LEAF_TYPE = LeafType.CONTINUOUS
def __init__(
self,
scope: int,
densities: Optional[Union[List[float], np.ndarray]] = None,
breaks: Optional[Union[List[float], np.ndarray]] = None
):
"""
Initialize a histogram-Isotonic leaf node given its scope.
:param scope: The scope of the leaf.
:param densities: The densities. They must sum up to one.
:param breaks: The breaks values, such that len(breaks) == len(densities) + 1.
:raises ValueError: If a parameter is out of domain.
"""
super().__init__(scope)
self.densities = None
self.breaks = None
self.distribution = None
if densities is not None and breaks is not None:
if len(breaks) != len(densities) + 1:
raise ValueError("Invalid histogram parameters shapes")
if not np.isclose(np.sum(densities), 1.0):
raise ValueError("Densities parameter must sum up to 1")
if isinstance(densities, list):
densities = np.array(densities, np.float32)
if isinstance(breaks, list):
breaks = np.array(breaks, np.float32)
self.densities = densities
self.breaks = breaks
self.distribution = ss.rv_histogram(histogram=(densities, breaks))
elif densities is not None or breaks is not None:
raise ValueError("Partial defined parameters (densities, breaks) are not handled")
[docs] def fit(self, data: np.ndarray, domain: tuple, alpha: float = 0.1, **kwargs):
"""
Fit the distribution parameters given the domain and some training data.
:param data: The training data.
:param domain: The domain of the distribution leaf.
:param alpha: The Laplace smoothing factor.
:param kwargs: Optional parameters.
:raises ValueError: If a parameter is out of domain.
"""
if not isinstance(domain, tuple):
raise ValueError("The domain must be continuous for an Isotonic distribution")
if alpha < 0.0:
raise ValueError("The Laplace smoothing factor must be non-negative")
histogram, breaks = np.histogram(data, bins='fd')
# Apply Laplace smoothing and obtain the densities
densities = (histogram + alpha) / (len(data) + len(histogram) * alpha)
densities = densities.astype(np.float32, copy=False)
breaks = breaks.astype(np.float32, copy=False)
# Build the distribution
self.densities = densities
self.breaks = breaks
self.distribution = ss.rv_histogram(histogram=(densities, breaks))
[docs] def em_init(self, random_state: np.random.RandomState):
raise NotImplementedError("EM parameters initialization not yet implemented for Isotonic distributions")
[docs] def em_step(self, stats: np.ndarray, data: np.ndarray, step_size: float):
raise NotImplementedError("EM step not yet implemented for Isotonic distributions")
[docs] def likelihood(self, x: np.ndarray) -> np.ndarray:
ls = np.ones([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
ood_mask = ~mask & ((x <= self.distribution.a) | (x >= self.distribution.b))
ls[~mask] = self.distribution.pdf(x[~mask])
ls[ood_mask] = np.finfo(np.float32).eps
return ls
[docs] def log_likelihood(self, x: np.ndarray) -> np.ndarray:
lls = np.zeros([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
ood_mask = ~mask & ((x <= self.distribution.a) | (x >= self.distribution.b))
lls[~mask] = self.distribution.logpdf(x[~mask])
lls[ood_mask] = np.log(np.finfo(np.float64).eps)
return lls
[docs] def mpe(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
idx = np.argmax(self.densities)
x[mask] = (self.breaks[idx] + self.breaks[idx + 1]) / 2.0
return x
[docs] def sample(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = self.distribution.ppf(q=np.random.rand(np.count_nonzero(mask)))
return x
[docs] def moment(self, k: int = 1) -> np.ndarray:
return self.distribution.moment(k)
[docs] def params_count(self) -> int:
return 2 * len(self.densities) + 1
[docs] def params_dict(self) -> dict:
if self.distribution is None:
return {'densities': None, 'breaks': None}
return {'densities': self.densities, 'breaks': self.breaks}
[docs]class Gaussian(Leaf):
LEAF_TYPE = LeafType.CONTINUOUS
def __init__(self, scope: int, mean: float = 0.0, stddev: float = 1.0):
"""
Initialize a Gaussian leaf node given its scope.
:param scope: The scope of the leaf.
:param mean: The mean parameter.
:param stddev: The standard deviation parameter.
:raises ValueError: If a parameter is out of domain.
"""
super().__init__(scope)
if stddev <= 1e-5:
raise ValueError("The standard deviation of a Gaussian must be greater than 1e-5")
self.mean = mean
self.stddev = stddev
[docs] def fit(self, data: np.ndarray, domain: tuple, **kwargs):
if not isinstance(domain, tuple):
raise ValueError("The domain must be continuous for a Gaussian distribution")
self.mean, self.stddev = ss.norm.fit(data)
self.stddev = max(self.stddev, 1e-5)
[docs] def em_init(self, random_state: np.random.RandomState):
self.mean = 1e-1 * random_state.randn()
self.stddev = 0.5 + 1e-1 * np.tanh(random_state.randn())
[docs] def em_step(self, stats: np.ndarray, data: np.ndarray, step_size: float):
data = np.squeeze(data, axis=1)
total_stats = np.sum(stats) + np.finfo(np.float32).eps
mean = np.sum(stats * data) / total_stats
stddev = np.sqrt(np.sum(stats * (data - mean) ** 2.0) / total_stats)
stddev = max(stddev, 1e-5)
# Update the parameters
self.mean = (1.0 - step_size) * self.mean + step_size * mean
self.stddev = (1.0 - step_size) * self.stddev + step_size * stddev
[docs] def likelihood(self, x: np.ndarray) -> np.ndarray:
ls = np.ones([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
ls[~mask] = ss.norm.pdf(x[~mask], self.mean, self.stddev)
return ls
[docs] def log_likelihood(self, x: np.ndarray) -> np.ndarray:
lls = np.zeros([len(x), 1], dtype=np.float32)
mask = np.isnan(x)
lls[~mask] = ss.norm.logpdf(x[~mask], self.mean, self.stddev)
return lls
[docs] def mpe(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = self.mean
return x
[docs] def sample(self, x: np.ndarray) -> np.ndarray:
x = np.copy(x)
mask = np.isnan(x)
x[mask] = ss.norm.rvs(self.mean, self.stddev, size=np.count_nonzero(mask))
return x
[docs] def moment(self, k: int = 1) -> float:
return ss.norm.moment(k, self.mean, self.stddev)
[docs] def params_count(self) -> int:
return 2
[docs] def params_dict(self) -> dict:
return {
'mean': self.mean,
'stddev': self.stddev
}