# MIT License: Copyright (c) 2021 Lorenzo Loconte, Gennaro Gala, Federico Luzzi
from typing import Union, Type, List
from collections import deque
import numpy as np
from deeprob.spn.structure.leaf import LeafType, Leaf
[docs]def gvs_cols(
data: np.ndarray,
distributions: List[Type[Leaf]],
domains: List[Union[list, tuple]],
random_state: np.random.RandomState,
p: float = 5.0
) -> np.ndarray:
"""
Greedy Variable Splitting (GVS) independence test.
:param data: The data.
:param distributions: The distributions.
:param domains: The domains.
:param random_state: The random state.
:param p: The threshold for the G-Test.
:return: A partitioning of features.
:raises ValueError: If the leaf distributions are discrete and continuous.
"""
_, n_features = data.shape
rand_init = random_state.randint(0, n_features)
features_set = set(filter(lambda x: x != rand_init, range(n_features)))
dependent_features_set = {rand_init}
features_queue = deque()
features_queue.append(rand_init)
while features_queue:
feature = features_queue.popleft()
features_remove = set()
for other_feature in features_set:
if not gtest(data, feature, other_feature, distributions, domains, p=p):
features_remove.add(other_feature)
dependent_features_set.add(other_feature)
features_queue.append(other_feature)
features_set = features_set.difference(features_remove)
partition = np.zeros(n_features, dtype=np.int64)
partition[list(dependent_features_set)] = 1
return partition
[docs]def rgvs_cols(
data: np.ndarray,
distributions: List[Type[Leaf]],
domains: List[Union[list, tuple]],
random_state: np.random.RandomState,
p: float = 5.0
) -> np.ndarray:
"""
Random Greedy Variable Splitting (RGVS) independence test.
:param data: The data.
:param distributions: The distributions.
:param domains: The domains.
:param random_state: The random state.
:param p: The threshold for the G-Test.
:return: A partitioning of features.
:raises ValueError: If the leaf distributions are discrete and continuous.
"""
_, n_features = data.shape
k = int(max(np.sqrt(n_features), 2))
if k == n_features:
return gvs_cols(data, distributions, domains, random_state, p)
rand_perm = random_state.permutation(np.arange(n_features))[:k]
data_gvs = data[:, rand_perm]
distributions_gvs = []
domains_gvs = []
for e in rand_perm:
distributions_gvs.append(distributions[e])
domains_gvs.append(domains[e])
partition_gvs = gvs_cols(data_gvs, distributions_gvs, domains_gvs, random_state, p)
if (partition_gvs != 1).all():
return np.zeros(n_features, dtype=np.int64)
if random_state.rand() < 0.5:
# excluded in first cluster 0
partition = np.zeros(n_features, dtype=np.int64)
else:
# excluded in second cluster 1
partition = np.ones(n_features, dtype=np.int64)
partition[rand_perm] = partition_gvs
return partition
[docs]def wrgvs_cols(
data: np.ndarray,
distributions: List[Type[Leaf]],
domains: List[Union[list, tuple]],
random_state: np.random.RandomState,
p: float = 5.0
) -> np.ndarray:
"""
Wiser Random Greedy Variable Splitting (WRGVS) independence test.
:param data: The data.
:param distributions: The distributions.
:param domains: The domains.
:param random_state: The random state.
:param p: The threshold for the G-Test.
:return: A partitioning of features.
:raises ValueError: If the leaf distributions are discrete and continuous.
"""
_, n_features = data.shape
k = int(max(np.sqrt(n_features), 2))
if k == n_features:
return gvs_cols(data, distributions, domains, random_state, p)
rand_perm = random_state.permutation(np.arange(n_features))[:k]
data_gvs = data[:, rand_perm]
distributions_gvs = []
domains_gvs = []
for e in rand_perm:
distributions_gvs.append(distributions[e])
domains_gvs.append(domains[e])
partition_gvs = gvs_cols(data_gvs, distributions_gvs, domains_gvs, random_state, p)
if ((partition_gvs != 1).all()) or ((partition_gvs != 0).all()):
return np.zeros(n_features, dtype=np.int64)
part_0 = set(rand_perm[partition_gvs == 0])
part_1 = set(rand_perm[partition_gvs == 1])
part_0_el = random_state.choice(list(part_0), replace=False)
part_1_el = random_state.choice(list(part_1), replace=False)
feature_excluded = set(range(n_features)) - set(rand_perm)
for f_i in feature_excluded:
# g testing for deciding which cluster
g_val0 = gtest(data, f_i, part_0_el, distributions, domains, p, test=False)
g_val1 = gtest(data, f_i, part_1_el, distributions, domains, p, test=False)
if g_val0 > g_val1:
part_0.add(f_i)
else:
part_1.add(f_i)
partition = np.zeros(n_features, dtype=np.int64)
partition[list(part_1)] = 1
return partition
[docs]def gtest(
data: np.ndarray,
i: int,
j: int,
distributions: List[Type[Leaf]],
domains: List[Union[list, tuple]],
p: float = 5.0,
test: bool = True
) -> Union[bool, float]:
"""
The G-Test independence test between two features.
:param data: The data.
:param i: The index of the first feature.
:param j: The index of the second feature.
:param distributions: The distributions.
:param domains: The domains.
:param p: The threshold for the G-Test.
:param test: If the method is called as test (true) or as value of statistics (false), default True.
:return: False if the features are assumed to be dependent, True otherwise.
:raises ValueError: If the leaf distributions are discrete and continuous.
"""
n_samples = len(data)
x1, x2 = data[:, i], data[:, j]
if distributions[i].LEAF_TYPE == LeafType.DISCRETE and distributions[j].LEAF_TYPE == LeafType.DISCRETE:
b1 = domains[i] + [len(domains[i])]
b2 = domains[j] + [len(domains[j])]
elif distributions[i].LEAF_TYPE == LeafType.CONTINUOUS and distributions[j].LEAF_TYPE == LeafType.CONTINUOUS:
b1 = np.histogram_bin_edges(x1, bins='fd')
b2 = np.histogram_bin_edges(x2, bins='fd')
hist, _, _ = np.histogram2d(x1, x2, bins=[b1, b2])
else:
raise ValueError("Leaves distributions must be either discrete or continuous")
hist, _, _ = np.histogram2d(x1, x2, bins=[b1, b2])
# Compute G-test statistics
hist = hist.astype(np.float32) + np.finfo(np.float32).eps
m1 = np.sum(hist, axis=1, keepdims=True)
m2 = np.sum(hist, axis=0, keepdims=True)
e = m1 * m2 / n_samples
g_val = 2.0 * np.sum(hist * np.log(hist / e))
# Return test result
if test:
f1 = np.count_nonzero(m1)
f2 = np.count_nonzero(m2)
dof = (f1 - 1) * (f2 - 1)
p_thresh = 2.0 * dof * p
return g_val < p_thresh
# Return the value of G-test
return g_val