Source code for so_magic.data.discretization

from abc import ABC, abstractmethod
import inspect
import attr
import pandas as pd
from so_magic.utils import SubclassRegistry


[docs]class DiscretizerInterface(ABC):
[docs] def discretize(self, *args, **kwargs): raise NotImplementedError
[docs]class AbstractDiscretizer(DiscretizerInterface):
[docs] def discretize(self, *args, **kwargs): raise NotImplementedError
[docs]@attr.s class BaseDiscretizer(AbstractDiscretizer): binner = attr.ib()
[docs] def discretize(self, *args, **kwargs): """Expects args: dataset, feature and kwargs; 'nb_bins'.""" datapoints = args[0] attribute = args[1] bins = args[2] try: output = self.binner.bin(datapoints.column(attribute), bins, **kwargs) except TypeError as type_error: msg = f'Table column being processed: {attribute}. Exception text: {str(type_error)}' raise TypeError(msg) from type_error return output
[docs]@attr.s class FeatureDiscretizer(BaseDiscretizer): feature = attr.ib(init=True)
[docs] def discretize(self, *args, **kwargs): """Expects args: dataset, nb_bins.""" return super().discretize(args[0], self.feature, args[1])
[docs]@attr.s class FeatureDiscretizerFactory: binner_factory = attr.ib(init=True)
[docs] def categorical(self, feature, **kwargs) -> FeatureDiscretizer: binner_type = 'same-length' if kwargs.get('quantisized', False): binner_type = 'quantisized' return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
[docs] def numerical(self, feature, **kwargs) -> FeatureDiscretizer: binner_type = 'same-length' if kwargs.get('quantisized', False): binner_type = 'quantisized' return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
#########################################
[docs]class BinnerInterface(ABC):
[docs] @abstractmethod def bin(self, values, bins): raise NotImplementedError
[docs]@attr.s class BaseBinner(BinnerInterface): algorithm = attr.ib()
[docs] def bin(self, values, bins): """It is assumed numerical (ratio or interval) variable or ordinal (not nominal) categorical variable.""" try: return self.algorithm.run(values, bins) except TypeError as type_error: raise TypeError(f'Exception text: {str(type_error)}. Possible reasons: preprocessing is needed to make sure' f' suitable values are places in missing entries and/or all entries are of the same type') \ from type_error
[docs]class BinnerClass(metaclass=SubclassRegistry): pass
[docs]class BinnerFactory: parent_class = BinnerClass
[docs] def equal_length_binner(self, *args, **kwargs) -> BaseBinner: """Binner that create bins of equal size (max_value - min_value)""" raise NotImplementedError
[docs] def quantisized_binner(self, *args, **kwargs) -> BaseBinner: """Binner that will adjust the bin sizes so that the observations are evenly distributed in the bins Raises: NotImplementedError: [description] Returns: BaseBinner: [description] """ raise NotImplementedError
[docs] def create_binner(self, *args, **kwargs) -> BaseBinner: raise NotImplementedError
[docs]class AlgorithmInterface(ABC):
[docs] @abstractmethod def run(self, *args, **kwargs): raise NotImplementedError
[docs]@attr.s class AlgorithmArguments: """An algorithms expected positional arguments.""" arg_types = attr.ib() default_values = attr.ib() _required_args = attr.ib(init=False, default=attr.Factory(lambda self: len(self.arg_types), takes_self=True))
[docs] def values(self, *args): if len(args) > len(self._required_args): raise AlgorithmArgumentsError(f'Given more than the supported naumber of arguments. ' f'{len(args)} > {len(self._required_args)}') missing = len(self._required_args) - len(args) computed_args_list = list(args) + self.default_values[-missing:] if not all(isinstance(arg_value, self.arg_types[i]) for i, arg_value in computed_args_list): raise AlgorithmArgumentsError('Type missmatch') return computed_args_list
[docs]@attr.s class AbstractAlgorithm(AlgorithmInterface, ABC): callback: callable = attr.ib() arguments: list = attr.ib(default=attr.Factory(list)) parameters: dict = attr.ib(default=attr.Factory(dict)) default_parameter_values = attr.ib(init=False, default=attr.Factory( lambda self: {k: v['value'] for k, v in self.parameters.items()}, takes_self=True)) _args = attr.ib(init=False, default=attr.Factory(list))
[docs]@attr.s class MagicAlgorithm(AbstractAlgorithm): _signature = attr.ib(init=False, default=attr.Factory(lambda self: inspect.signature(self.callback), takes_self=True)) _output = attr.ib(init=False, default=attr.Factory(dict))
[docs] def run(self, *args, **kwargs): if not len(args) == len(self.arguments): raise MagicAlgorithmError( f'Number of runtime positional arguments do not match the expected number of positional argumnets. ' f'Given {len(args)} arguments: [{", ".join(str(_) for _ in args)}]. Expected {len(self.arguments)} ' f'arguments: [{", ".join(str(_) for _ in self.arguments)}].') if not all(isinstance(argument, self.arguments[i]) for i, argument in enumerate(args)): raise MagicAlgorithmError(f'Bad positional argument for algorithm. Expected arguments with types ' f'[{", ".join(self.arguments)}]. Instead got [{", ".join(self.arguments)}].') self._args = list(args) self.update_parameters(**kwargs) result = self._run_callback() self._output['settings'] = self._get_settings(result) self._output['result'] = self._get_result(result) return self._output
def _run_callback(self): return self.callback(*self._args, **{k: v['value'] for k, v in self.parameters.items()}) @property def output(self): return self._output def _get_result(self, result): return result def _get_settings(self, _result): return { 'arguments': self._args, 'parameters': { param_name: param_data['value'] for param_name, param_data in self.parameters.items() }, }
[docs] def update_parameters(self, **kwargs): if not all(isinstance(parameter_value, self.parameters['type']) for parameter_name, parameter_value in kwargs if parameter_name in self.parameters): raise MagicAlgorithmParametersError( f'Bad algorithm parameters. Allowed parameters with types ' f'[{", ".join(f"{k}: {v}" for k, v in self.parameters.items())}]. ' f'Instead got [{", ".join(f"{k}: {v}" for k, v in kwargs.items())}].') self._update_params(**kwargs)
[docs] def set_default_parameters(self): self._update_params(**self.default_parameter_values)
def _update_params(self, **kwargs): for key, value in kwargs.items(): self.parameters[key]['value'] = value
[docs]class MagicAlgorithmError(Exception): pass
[docs]class MagicAlgorithmParametersError(Exception): pass
[docs]class AlgorithmArgumentsError(Exception): pass
[docs]def call_method(a_callable): def _call(_self, *args, **kwargs): return a_callable(*args, **kwargs) return _call
[docs]@attr.s class Discretizer(BaseDiscretizer): @property def algorithm(self): return self.binner.algorithm
[docs] @classmethod def from_algorithm(cls, alg): binner = BaseBinner(alg) return Discretizer(binner)
[docs]class BinningAlgorithm(metaclass=SubclassRegistry):
[docs] @classmethod def from_built_in(cls, algorithm_id): return cls.create(algorithm_id, pd.cut, # TODO replace with call to dataclass [object, object], { 'right': { 'type': bool, 'value': True, }, 'labels': { 'type': object, 'value': None }, 'retbins': { 'type': bool, 'value': True }, 'precision': { 'type': int, 'value': 3 }, 'include_lowest': { 'type': bool, 'value': False }, 'duplicates': { 'type': str, 'value': 'raise' }, } )
[docs]@BinningAlgorithm.register_as_subclass('pd.cut') class PDCutBinningAlgorithm(MagicAlgorithm): def _get_settings(self, result): return dict(super()._get_settings(result), **{'used_bins': result[1]}) def _get_result(self, result): if bool(self.parameters['retbins']): return super()._get_result(result)[0] return super()._get_result(result)