Source code for deeprob.spn.utils.partitioning

# MIT License: Copyright (c) 2021 Lorenzo Loconte, Gennaro Gala

from __future__ import annotations
import itertools
from typing import Optional, Tuple

import numpy as np


[docs]class Partition: def __init__( self, row_ids: list, col_ids: list, uncond_vars: list, parent_partition: Optional[Partition] = None, is_naive: Optional[bool] = False, is_conj: Optional[bool] = False, ): """ Create a partition, i.e. an object modeling a data slice (and some of its properties) by keeping track of its indices (i.e. row_ids and col_ids). :param row_ids: The row indices of the modeled slice. :param col_ids: The column indices of the modeled slice. :param uncond_vars: Ordered list of variables from which the conjunction variables will be extracted to horizontally split the current partition. :param parent_partition: The optional parent partition :param is_naive: If True and determinism is not required, a naive factorization will be learnt over the data slice modeled by the current partition; otherwise, if True and determinism is required, a disjunction will be learnt over the data slice modeled by the current partition. :param is_conj: True if the modeled slice is associated to a conjunction, i.e. every row in the slice is equal to the others. """ self.row_ids = np.array(row_ids) self.col_ids = np.array(col_ids) self.uncond_vars = list(uncond_vars) self.parent_partition = parent_partition self.set_parent_partition(parent_partition) self.sub_partitions = [] self.is_naive = is_naive self.is_conj = is_conj # discarded assignments, see build_leaf() in xpc.py self.disc_assignments = None
[docs] def set_parent_partition(self, parent_partition: Partition): """ Set the parent partition and update its sub_partitions attribute. :param parent_partition: The parent partition. """ if parent_partition is not None: parent_partition.sub_partitions.append(self)
[docs] def is_partitioned(self): """ :return: True if the partition is partitioned, False otherwise. """ return len(self.sub_partitions) != 0
[docs] def is_horizontally_partitioned(self): """ :return: True if the partition is horizontally partitioned, False otherwise. """ ret = False if self.is_partitioned(): ret = len(self.row_ids) > len(self.sub_partitions[0].row_ids) return ret
[docs] def get_slice(self, data: np.ndarray) -> np.ndarray: """ Slice the input data matrix according to self. :param data: The data to be sliced. :return: The data slice. """ return data[self.row_ids][:, self.col_ids]
[docs] def get_vertical_split(self) -> list[np.ndarray, np.ndarray]: """ If possible, split vertically the current partition. """ vertical_split = [] cond_vars = [col_id for col_id in self.col_ids if col_id not in self.uncond_vars] if len(cond_vars) != 0 and len(cond_vars) != len(self.col_ids): vertical_split = [np.asarray(cond_vars), np.asarray(self.uncond_vars)] return vertical_split
[docs] def get_conj_row_ids( self, data: np.ndarray, conj: list, min_part_inst: int, ) -> np.ndarray: """ Return the row ids of the instances satisfying the given conjunction. The row ids must be found within the slice modeled by the self partition. :param data: The input data. :param conj: Conjunction modeled as a list of two lists: the first contains the IDs of the variables, the second the related assignment. For example, [[8,3],[1,0]] models the conjunction X8=1 and X3=0. :param min_part_inst: the minimum number of instances allowed to return. :return: The row ids of the instances satisfying the given conjunction iff the number of such instances is greater or equal than the minimum number of instances allowed to return; otherwise, an empty array. """ if len(self.row_ids) < min_part_inst: conj_row_ids = np.empty(0, dtype=np.int32) else: conj_row_ids = self.row_ids.copy() for i in range(len(conj[0])): conj_row_ids = conj_row_ids[np.where(data[np.array(conj_row_ids), conj[0][i]] == conj[1][i])[0]] if len(conj_row_ids) < min_part_inst: conj_row_ids = np.empty(0, dtype=np.int32) break return conj_row_ids
[docs] def get_horizontal_split( self, data: np.ndarray, min_part_inst: int, conj_len: int, arity: int, sd: bool, random_state: np.random.RandomState ) -> Tuple[list, np.ndarray, list]: """ If possible, split horizontally the current partition. :param data: The input data matrix. :param min_part_inst: The minimum number of instances allowed per partition. :param conj_len: The conjunction length. :param arity: The maximum number of subpartitions for an horizontal partitioned partition. :param sd: True if the generated tree will be used to model a SD PC, False otherwise. :param random_state: The random state. """ if len(self.uncond_vars) < conj_len or len(self.row_ids) < 2 * min_part_inst: return [], np.array([]), [] uncond_vars = self.uncond_vars.copy() if not sd: random_state.shuffle(uncond_vars) conj_vars = uncond_vars[:conj_len] # list of all possible assignments for a conjunction with length conj_len assignments = [list(assignment) for assignment in itertools.product([0, 1], repeat=len(conj_vars))] random_state.shuffle(assignments) discarded_row_ids = self.row_ids.copy() conj_row_ids_l = [] for assignment in assignments: conj = [conj_vars, assignment] conj_row_ids = self.get_conj_row_ids(data, conj, min_part_inst) if len(conj_row_ids) == len(self.row_ids): return [], discarded_row_ids, conj_vars if len(conj_row_ids) != 0 and len(discarded_row_ids) - len(conj_row_ids) >= min_part_inst: discarded_row_ids = np.setdiff1d(discarded_row_ids, conj_row_ids) conj_row_ids_l.append(conj_row_ids) if len(conj_row_ids_l) == arity - 1 or len(discarded_row_ids) < 2 * min_part_inst: break if conj_row_ids_l: return conj_row_ids_l, discarded_row_ids, conj_vars return [], np.array([]), []
[docs]def generate_random_partitioning( data: np.ndarray, min_part_inst: int, n_max_parts: int, conj_len: int, arity: int, sd: bool, uncond_vars: list, random_state: np.random.RandomState ): """ Create a random partition tree. :param data: The input data matrix. :param min_part_inst: The minimum number of instances allowed per partition. :param n_max_parts: The maximum number of partitions in the tree. :param conj_len: The conjunction length. :param arity: The maximum number of subpartitions for an horizontal partitioned partition. :param sd: True if the generated tree will be used to model a SD PC, False otherwise. :param uncond_vars: Ordered list of variables from which the first *conj_len* ones are extracted as conjunction variables to partition the root partition. :param random_state: The random state. :return partition_root: The partition root of the tree. :return cl_parts_l: List containing the leaf partitions over which a CLTree will be learnt. :return conj_vars_l: List of lists. Every sublist contains the variables of a conjunction (e.g. [[3, 5]]). If a sublist occurs before another, then the former has been used first. There are no duplicates. :return n_partitions: The number of partitions in the generated tree. """ partition_root = Partition(row_ids=np.arange(data.shape[0]), col_ids=uncond_vars, uncond_vars=uncond_vars, parent_partition=None) n_partitions = 0 conj_vars_l = [] cl_parts_l = [] leaves = [partition_root] while leaves and n_partitions + len(leaves) < n_max_parts: # randomly pop a leaf partition part = leaves.pop(random_state.randint(len(leaves))) conj_row_ids_l, discarded_row_ids, conj_vars = \ part.get_horizontal_split(data, min_part_inst, conj_len, arity, sd, random_state) if len(discarded_row_ids): if conj_vars not in conj_vars_l: conj_vars_l.append(conj_vars) # this ensures a general definition of the list uncond_vars, preserving its order uncond_vars = [uv for uv in part.uncond_vars if uv not in conj_vars] part_buffer = [ Partition(row_ids=discarded_row_ids, col_ids=part.col_ids.copy(), uncond_vars=uncond_vars.copy(), parent_partition=part)] for conj_row_ids in conj_row_ids_l: part_buffer.append( Partition(row_ids=conj_row_ids, col_ids=part.col_ids.copy(), uncond_vars=uncond_vars.copy(), parent_partition=part)) discarded_assignments = \ {tuple(assignment) for assignment in itertools.product([0, 1], repeat=len(conj_vars))} for k in range(len(part_buffer)): part = part_buffer[k] vertical_split = part.get_vertical_split() if vertical_split: n_partitions += 1 is_conj = False if not k else True p = Partition(row_ids=part.row_ids.copy(), col_ids=vertical_split[0].copy(), uncond_vars=[], parent_partition=part, is_naive=True, is_conj=is_conj) if is_conj: discarded_assignments.remove(tuple(p.get_slice(data)[0])) leaves.append( Partition(row_ids=part.row_ids.copy(), col_ids=vertical_split[1].copy(), uncond_vars=vertical_split[1].copy(), parent_partition=part)) else: leaves.append(part) if part_buffer[0].sub_partitions: part_buffer[0].sub_partitions[0].disc_assignments = \ np.array(list(discarded_assignments)) else: n_partitions += 1 cl_parts_l.append(part) # in case the process ended because n_partitions + len(leaves) > n_max_parts n_partitions += len(leaves) cl_parts_l.extend(leaves) return partition_root, cl_parts_l, conj_vars_l, n_partitions