Source code for dae.pheno.prepare.measure_classifier

import enum
from typing import Any, cast

import numpy as np
from box import Box

from dae.pheno.common import InferenceConfig, MeasureType
from dae.pheno.utils.commons import remove_annoying_characters


[docs] class ClassifierReport: """Class used to collect clissifier reports.""" MAX_CHARS = 32 DISTRIBUTION_CUTOFF = 20 def __init__(self) -> None: self.instrument_name: str | None = None self.measure_name: str | None = None self.measure_type: str | None = None self.count_total: int | None = None self.count_with_values: int | None = None self.count_without_values: int | None = None self.count_with_numeric_values: int | None = None self.count_with_non_numeric_values: int | None = None self.count_unique_values: int | None = None self.count_unique_numeric_values: int | None = None self.value_max_len: int | None = None self.unique_values: list[Any] | None = None self.numeric_values: list[float | None] | np.ndarray | None = None self.distribution: Any = None self.min_value: int | None = None self.max_value: int | None = None self.values_domain: str | None = None self.rank: int | None = None self.db_name: str | None = None
[docs] def set_measure(self, measure: Box) -> "ClassifierReport": self.instrument_name = measure.instrument_name self.measure_name = measure.measure_name self.measure_type = measure.measure_type.name return self
[docs] @staticmethod def short_attributes() -> list[str]: return [ "instrument_name", "measure_name", "measure_type", "count_total", "count_with_values", "count_with_numeric_values", "count_with_non_numeric_values", "count_without_values", "count_unique_values", "count_unique_numeric_values", "value_max_len", ]
def __repr__(self) -> str: return self.log_line(short=True)
[docs] def log_line(self, *, short: bool = False) -> str: """Construct a log line in clissifier report.""" attributes = self.short_attributes() if not short: attributes.append("values_domain") values = [str(getattr(self, attr)).strip() for attr in attributes] values = [v.replace("\n", " ") for v in values] return "\t".join(values)
[docs] def is_nan(val: Any) -> bool: """Check if the passed value is a NaN.""" if val is None: return True if isinstance(val, str) and val.strip() == "": return True return type(val) in {float, np.float64, np.float32} and np.isnan(val)
[docs] class Convertible(enum.Enum): # pylint: disable=invalid-name nan = 0 numeric = 1 non_numeric = 2
[docs] def is_convertible_to_numeric(val: Any) -> Convertible: """Check if the passed string is convertible to number.""" if val is None: return Convertible.nan if isinstance(val, str): val = val.strip() if val == "": return Convertible.nan if isinstance(val, float) and np.isnan(val): return Convertible.nan if isinstance(val, bool): return Convertible.non_numeric if isinstance(val, np.bool_): return Convertible.non_numeric try: val = float(val) except ValueError: pass else: return Convertible.numeric return Convertible.non_numeric
[docs] def convert_to_numeric(val: Any) -> float | np.float_: """Convert passed value to float.""" if is_convertible_to_numeric(val) == Convertible.numeric: return float(val) return np.nan
[docs] def convert_to_string(val: Any) -> str | None: """Convert passed value to string.""" if is_nan(val): return None if isinstance(val, str): return str(remove_annoying_characters(val)) return str(val)
[docs] class MeasureClassifier: """Defines a measure classification report.""" def __init__(self, config: InferenceConfig): self.config = config
[docs] def classify(self, rep: ClassifierReport) -> MeasureType: """Classify a measure based on classification report.""" conf = self.config if ( conf.min_individuals is not None and rep.count_with_values is not None and rep.count_with_values < conf.min_individuals ): return MeasureType.raw non_numeric = ( 1.0 * cast(int, rep.count_with_non_numeric_values) ) / cast(int, rep.count_with_values) if non_numeric <= conf.non_numeric_cutoff: if ( rep.count_unique_numeric_values is not None and conf.continuous.min_rank is not None and rep.count_unique_numeric_values >= conf.continuous.min_rank ): return MeasureType.continuous if ( rep.count_unique_numeric_values is not None and conf.ordinal.min_rank is not None and rep.count_unique_numeric_values >= conf.ordinal.min_rank ): return MeasureType.ordinal return MeasureType.raw if ( rep.count_unique_values is not None and conf.categorical.min_rank is not None and conf.categorical.max_rank is not None and rep.count_unique_values >= conf.categorical.min_rank and rep.count_unique_values <= conf.categorical.max_rank # and rep.value_max_len <= conf.value_max_len ): return MeasureType.categorical return MeasureType.raw
[docs] def classification_reference_impl( measure_values: list[str | None], config: InferenceConfig, ) -> tuple[list[Any], ClassifierReport]: """Reference implementation for measure classification.""" report = ClassifierReport() unique_values: set[str] = set() numeric_values: list[float | None] = [] unique_numeric_values = set() report.count_total = len(measure_values) numeric_count = 0 measure_type = None none_count = 0 if config.measure_type is not None: measure_type = MeasureType.from_str(config.measure_type) for val in measure_values: if val is None: none_count += 1 numeric_values.append(None) continue unique_values.add(val) try: num_value = float(val) numeric_values.append(num_value) numeric_count += 1 unique_numeric_values.add(num_value) except ValueError: numeric_values.append(None) report.numeric_values = numeric_values report.count_with_values = len(measure_values) - none_count report.count_without_values = none_count report.count_with_numeric_values = numeric_count report.count_with_non_numeric_values = \ report.count_with_values - report.count_with_numeric_values report.unique_values = list(unique_values) report.count_unique_values = len(report.unique_values) report.count_unique_numeric_values = len(unique_numeric_values) assert ( report.count_total == report.count_with_values + report.count_without_values ) assert ( report.count_with_values == report.count_with_numeric_values + report.count_with_non_numeric_values ) classifier = MeasureClassifier(config) if measure_type is None: measure_type = classifier.classify(report) report.measure_type = measure_type non_null_numeric_values = list( filter(lambda x: x is not None, report.numeric_values)) if measure_type in {MeasureType.continuous, MeasureType.ordinal}: if len(non_null_numeric_values) == 0: raise ValueError( "Measure is set as numeric but has no numeric values!", ) report.min_value = np.min(cast(np.ndarray, non_null_numeric_values)) if isinstance(report.min_value, np.bool_): report.min_value = np.int8(report.min_value) report.max_value = np.max(cast(np.ndarray, non_null_numeric_values)) if isinstance(report.max_value, np.bool_): report.max_value = np.int8(report.max_value) report.values_domain = f"[{report.min_value}, {report.max_value}]" else: values = [v for v in report.unique_values if v.strip() != ""] report.values_domain = ", ".join(sorted(values)) report.rank = report.count_unique_values if measure_type in [MeasureType.ordinal, MeasureType.continuous]: assert len(measure_values) == len(numeric_values) return numeric_values, report return measure_values, report