"""
Provide classes for grouping of individuals by some criteria.
This module provides functionality for grouping
individuals from a study or study group into various
sets based on what value they have in a given mapping.
"""
from __future__ import annotations
import logging
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any, Literal
from pydantic import (
BaseModel,
ConfigDict,
)
from dae.pedigrees.families_data import FamiliesData
from dae.pedigrees.family import Person
from dae.pheno.pheno_data import MeasureType, PhenotypeData
from dae.variants.attributes import Sex
logger = logging.getLogger(__name__)
[docs]
class PersonSetConfig(BaseModel):
"""Configuration for a person set."""
model_config = ConfigDict(extra="forbid")
id: str
name: str
values: tuple[str, ...]
color: str
[docs]
class SourceConfig(BaseModel):
"""Configuration for a source."""
model_config = ConfigDict(extra="forbid")
from_: Literal["pedigree", "phenodb"]
source: str
[docs]
class PersonSetCollectionConfig(BaseModel):
"""Configuration for a collection of person sets."""
model_config = ConfigDict(extra="forbid")
id: str
name: str
sources: list[SourceConfig]
domain: list[PersonSetConfig]
default: PersonSetConfig
def _parse_psc_sources(
psc_id: str,
psc_config: dict[str, Any],
) -> list[SourceConfig]:
if "sources" not in psc_config:
raise ValueError(
f"No sources defined for person set collection: {psc_id}")
psc_sources = []
for source in psc_config["sources"]:
if "from" not in source:
raise ValueError(
f"No 'from' defined for source in person set collection: "
f"{psc_id}")
if "source" not in source:
raise ValueError(
f"No 'source' defined for source in person set collection: "
f"{psc_id}")
psc_sources.append(SourceConfig(from_=source["from"],
source=source["source"]))
if not psc_sources:
raise ValueError(
f"Empty sources defined for person set collection: {psc_id}")
return psc_sources
[docs]
def parse_person_set_config(
psc_id: str,
domain: dict[str, Any],
) -> PersonSetConfig:
"""Parse a person set configuration."""
if "id" not in domain:
raise ValueError(
f"No id defined for domain in person set collection: "
f"{psc_id}")
if "name" not in domain:
raise ValueError(
f"No name defined for domain in person set collection: "
f"{psc_id}")
if "values" not in domain:
raise ValueError(
f"No values defined for domain in person set collection: "
f"{psc_id}")
if "color" not in domain:
raise ValueError(
f"No color defined for domain in person set collection: "
f"{psc_id}")
return PersonSetConfig(
id=domain["id"],
name=domain["name"],
values=tuple(domain["values"]),
color=domain["color"],
)
def _parse_psc_domain(
psc_id: str,
psc_config: dict[str, Any],
psc_sources: list[SourceConfig],
) -> list[PersonSetConfig]:
if "domain" not in psc_config:
raise ValueError(
f"No domain defined for person set collection: {psc_id}")
psc_domain = []
for domain in psc_config["domain"]:
ps_config = parse_person_set_config(psc_id, domain)
if len(ps_config.values) != len(psc_sources):
raise ValueError(
f"Values count {ps_config.values} " # noqa: PD011
"mismatch for domain in person set collection: "
f"{psc_id}")
psc_domain.append(ps_config)
if not psc_domain:
logger.warning(
"Empty domain defined for person set collection: %s", psc_id)
return psc_domain
def _parse_psc_default(
psc_id: str,
psc_config: dict[str, Any],
) -> PersonSetConfig:
if "default" not in psc_config:
raise ValueError(
f"No default defined for person set collection: {psc_id}")
psc_default = psc_config["default"]
if "id" not in psc_default:
raise ValueError(
f"No id defined for default in person set collection: {psc_id}")
if "name" not in psc_default:
raise ValueError(
f"No name defined for default in person set collection: {psc_id}")
if "color" not in psc_default:
raise ValueError(
f"No color defined for default in person set collection: {psc_id}")
if "values" in psc_default:
raise ValueError(
f"Values shoud not be defined for default in "
f"person set collection: {psc_id}")
return PersonSetConfig(
values=(),
**psc_default,
)
[docs]
def parse_person_set_collection_config(
psc_config: dict[str, Any],
) -> PersonSetCollectionConfig:
"""Parse a person set collection configuration."""
if "id" not in psc_config:
raise ValueError(
"No id defined for person set collection configuration")
psc_id = psc_config["id"]
if "name" not in psc_config:
raise ValueError(
f"No name defined for person set collection: {psc_id}")
if psc_config["id"] != psc_id:
raise ValueError(
f"Person set collection id mismatch: {psc_id} != "
f"{psc_config['id']}")
psc_sources = _parse_psc_sources(psc_id, psc_config)
psc_domain = _parse_psc_domain(psc_id, psc_config, psc_sources)
psc_default = _parse_psc_default(psc_id, psc_config)
return PersonSetCollectionConfig(
id=psc_id,
name=psc_config["name"],
sources=psc_sources,
domain=psc_domain,
default=psc_default,
)
[docs]
def parse_person_set_collections_study_config(
config: dict[str, Any],
) -> dict[str, PersonSetCollectionConfig]:
"""Parse a person sets configuration."""
if "person_set_collections" not in config:
raise ValueError("Invalid person sets collections configuration")
pscs_config = config["person_set_collections"]
if "selected_person_set_collections" not in pscs_config:
raise ValueError("No person set collections selected")
psc_selected = pscs_config["selected_person_set_collections"]
result = {}
for psc_id in psc_selected:
if psc_id not in pscs_config:
raise ValueError(
f"Selected person set collection not found: {psc_id}")
psc_config = pscs_config[psc_id]
if "id" not in psc_config:
raise ValueError(
f"No id defined for person set collection: {psc_id}")
psc_config = parse_person_set_collection_config(psc_config)
result[psc_id] = psc_config
return result
[docs]
@dataclass
class ChildrenStats:
"""Statistics about children in a PersonSet."""
male: int
female: int
unspecified: int
parents: int
@property
def total(self) -> int:
return self.male + self.female + self.unspecified
[docs]
@dataclass
class ChildrenBySex:
"""Statistics about children in a PersonSet."""
male: set[tuple[str, str]]
female: set[tuple[str, str]]
unspecified: set[tuple[str, str]]
[docs]
@dataclass
class PersonSet:
"""Set of individuals mapped to a common value in the source."""
def __init__(
self, psid: str, name: str,
values: tuple[str, ...], color: str,
persons: dict[tuple[str, str], Person]):
self.id: str = psid # pylint: disable=invalid-name
self.name: str = name
self.values: tuple[str, ...] = values
self.color: str = color
assert all(not p.generated for p in persons.values())
self.persons: dict[tuple[str, str], Person] = persons
self._children_by_sex: ChildrenBySex | None = None
self._children_stats: ChildrenStats | None = None
self._children: list[Person] | None = None
self._children_count: int | None = None
def __repr__(self) -> str:
return f"PersonSet({self.id}: {self.name}, {len(self.persons)})"
def __len__(self) -> int:
return len(self.persons)
[docs]
def get_children(self) -> list[Person]:
"""Return all children in the person set."""
if self._children is None:
self._children = []
for person in self.persons.values():
if person.is_child():
self._children.append(person)
return self._children
[docs]
def get_children_count(self) -> int:
if self._children_count is None:
self._children_count = len(self.get_children())
return self._children_count
[docs]
def get_children_by_sex(self) -> ChildrenBySex:
"""Return all children in the person set splitted by sex."""
if self._children_by_sex is None:
self._children_by_sex = ChildrenBySex(
set(), set(), set(),
)
for child in self.get_children():
if child.sex == Sex.M:
self._children_by_sex.male.add(child.fpid)
elif child.sex == Sex.F:
self._children_by_sex.female.add(child.fpid)
else:
assert child.sex == Sex.U
self._children_by_sex.unspecified.add(child.fpid)
assert self._children_by_sex is not None
return self._children_by_sex
[docs]
def get_children_stats(self) -> ChildrenStats:
"""Return statistics about children in the person set."""
if self._children_stats is None:
children_by_sex = self.get_children_by_sex()
self._children_stats = ChildrenStats(
len(children_by_sex.male),
len(children_by_sex.female),
len(children_by_sex.unspecified),
len(list(self.get_parents())),
)
assert self._children_stats is not None
return self._children_stats
[docs]
def get_parents(self) -> Generator[Person, None, None]:
for person in self.persons.values():
if person.is_parent():
yield person
[docs]
def to_json(self) -> dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"values": self.values,
"color": self.color,
"person_ids": list(self.persons.keys()),
}
[docs]
@staticmethod
def from_json(json: dict[str, Any], families: FamiliesData) -> PersonSet:
"""Construct person set from a JSON dict."""
real_persons = families.real_persons
persons = {
pid: real_persons[pid]
for pid in json["person_ids"] if pid in real_persons
}
return PersonSet(
json["id"],
json["name"],
json["values"],
json["color"],
persons,
)
[docs]
class PersonSetCollection:
"""The collection of all possible person sets in a given source."""
def __init__(
self,
config: PersonSetCollectionConfig,
person_sets: dict[str, PersonSet],
default: PersonSet,
families: FamiliesData):
self.config = config
self.id: str = config.id
self.name: str = config.name
self.sources = self.config.sources
self.person_sets: dict[str, PersonSet] = person_sets
self.default: PersonSet = default
self.person_sets[default.id] = default
self.families: FamiliesData = families
def __repr__(self) -> str:
return f"PersonSetCollection({self.id}: {self.person_sets})"
def __len__(self) -> int:
return len(self.person_sets)
[docs]
def is_pedigree_only(self) -> bool:
return all(s.from_ == "pedigree" for s in self.sources)
@staticmethod
def _produce_sets(
config: PersonSetCollectionConfig,
) -> dict[str, PersonSet]:
"""
Produce initial PersonSet instances.
Initializes a dictionary of person set IDs mapped to
empty PersonSet instances from a given configuration.
"""
result = {}
for ps_config in config.domain:
result[ps_config.id] = PersonSet(
ps_config.id,
name=ps_config.name,
values=ps_config.values,
color=ps_config.color,
persons={},
)
return result
@staticmethod
def _produce_default_person_set(
config: PersonSetCollectionConfig,
) -> PersonSet:
default_config = config.default
return PersonSet(
default_config.id,
name=default_config.name,
values=(),
color=default_config.color,
persons={},
)
[docs]
@staticmethod
def get_person_color(
person: Person, person_set_collection: PersonSetCollection,
) -> str:
"""Get the hex color value for a Person in a PersonSetCollection."""
if person.generated:
return "#E0E0E0"
if person_set_collection is None:
return "#FFFFFF"
matching_person_set = person_set_collection.get_person_set_of_person(
person.fpid,
)
if matching_person_set is not None:
return matching_person_set.color
logger.warning(
"Person <%s> could not be found in any"
" domain of <%s>!",
person.fpid, person_set_collection.id,
)
return "#AAAAAA"
[docs]
@staticmethod
def remove_empty_person_sets(
person_set_collection: PersonSetCollection,
) -> PersonSetCollection:
"""Remove all empty person sets in a PersonSetCollection in place."""
empty_person_sets = set()
for set_id, person_set in person_set_collection.person_sets.items():
if len(person_set.persons) == 0:
empty_person_sets.add(set_id)
logger.debug(
"empty person sets to remove from person set collection <%s>: %s",
person_set_collection.id, empty_person_sets)
for set_id in empty_person_sets:
del person_set_collection.person_sets[set_id]
return person_set_collection
[docs]
def collect_person_collection_attributes(
self, person: Person, pheno_db: PhenotypeData | None,
) -> tuple[str, ...]:
"""Collect all configured attributes for a Person."""
values = []
for source in self.sources:
if source.from_ == "pedigree":
value = person.get_attr(source.source)
# Convert to string since some of the person's
# attributes can be of an enum type
if value is not None:
value = str(value)
elif source.from_ == "phenodb" and pheno_db is not None:
assert pheno_db.get_measure(source.source).measure_type \
in {MeasureType.categorical, MeasureType.ordinal}, (
f"Continuous measures not allowed in person sets! "
f"({source.source})")
pheno_values = list(pheno_db.get_people_measure_values(
[source.source],
person_ids=[person.person_id],
))
if len(pheno_values) == 0:
value = None
else:
value = pheno_values[0][source.source]
else:
raise ValueError(f"Invalid source type {source.from_}!")
values.append(value)
return tuple(values)
[docs]
@staticmethod
def from_families(
psc_config: PersonSetCollectionConfig,
families_data: FamiliesData,
pheno_db: PhenotypeData | None = None,
) -> PersonSetCollection:
"""Produce a PersonSetCollection from a config and pedigree."""
collection = PersonSetCollection(
psc_config,
PersonSetCollection._produce_sets(psc_config),
PersonSetCollection._produce_default_person_set(psc_config),
families_data,
)
value_to_id = {
ps_config.values: ps_config.id # noqa: PD011
for ps_config in psc_config.domain
}
logger.debug("person set collection value_to_id: %s", value_to_id)
for person_id, person in families_data.real_persons.items():
assert not person.missing
value = collection.collect_person_collection_attributes(
person, pheno_db)
if value not in value_to_id:
collection.default.persons[person_id] = person
else:
set_id = value_to_id[value]
collection.person_sets[set_id].persons[person_id] = person
return PersonSetCollection.remove_empty_person_sets(collection)
[docs]
@staticmethod
def merge_configs(
person_set_collections: list[PersonSetCollection],
) -> PersonSetCollectionConfig:
"""
Merge the configurations of a list of PersonSetCollection objects.
Only supports merging PersonSetCollection objects with matching ids.
The method will not merge the PersonSet objects' values.
"""
assert len(person_set_collections) > 0
collections_iterator = iter(person_set_collections)
first = next(collections_iterator)
result: dict[str, Any] = {}
result["id"] = first.id
result["name"] = first.name
sources = [{
"from": "pedigree",
"source": first.id,
}]
result["sources"] = sources
result["default"] = {
"id": first.default.id,
"name": first.default.name,
"color": first.default.color,
}
domain = {}
for person_set in first.person_sets.values():
result_def = {
"id": person_set.id,
"name": person_set.name,
"values": list(person_set.values),
"color": person_set.color,
}
domain[person_set.id] = result_def
for collection in collections_iterator:
if result["id"] != collection.id:
logger.error(
"trying to merge different type of collections: %s <-> %s",
collection.id, result["id"])
raise ValueError(
"trying to merge different type of collections")
for person_set in collection.person_sets.values():
if person_set.id in domain:
# check if this person set is compatible
# with the existing one
pass
else:
result_def = {
"id": person_set.id,
"name": person_set.name,
"values": list(person_set.values),
"color": person_set.color,
}
domain[person_set.id] = result_def
if first.default.id in domain:
del domain[first.default.id]
result["domain"] = [
domain[vid] for vid in sorted(domain.keys())
]
return parse_person_set_collection_config(result)
[docs]
def get_person_set(
self, person_id: tuple[str, str],
) -> PersonSet | None:
for person_set in self.person_sets.values():
if person_id in person_set.persons:
return person_set
return None
[docs]
def get_person_set_of_person(
self, fpid: tuple[str, str],
) -> PersonSet | None:
"""Retrieve the PersonSet associated with the given person identifier.
Args:
fpid (tuple[str, str]): The person identifier consisting of two
strings - family ID and person ID.
Returns:
Optional[PersonSet]: The PersonSet associated with the given
person identifier, or None if not found.
"""
result = self.get_person_set(fpid)
if result is not None:
return result
return None
[docs]
@staticmethod
def combine(
collections: list[PersonSetCollection],
families: FamiliesData,
) -> PersonSetCollection:
"""Combine a list of PersonSetCollection objects into a single one."""
if len(collections) == 0:
raise ValueError("can't combine empty list of collections")
if len(collections) == 1:
return collections[0]
config = PersonSetCollection.merge_configs(collections)
result = PersonSetCollection(
config,
PersonSetCollection._produce_sets(config),
PersonSetCollection._produce_default_person_set(config),
families)
for person_id, person in families.real_persons.items():
person_set = None
for psc in collections:
person_set = psc.get_person_set(person_id)
if person_set is not None:
break
if person_set is not None:
result.person_sets[person_set.id].persons[person_id] = person
else:
result.default.persons[person_id] = person
return PersonSetCollection.remove_empty_person_sets(result)
[docs]
def config_json(self) -> dict[str, Any]:
"""Produce a JSON configuration for this PersonSetCollection object."""
domain = []
for person_set in self.person_sets.values():
if self.default.id == person_set.id:
continue
domain.append({
"id": person_set.id,
"name": person_set.name,
"values": person_set.values, # noqa: PD011
"color": person_set.color,
})
sources = [
{"from": s.from_, "source": s.source}
for s in self.sources
]
return {
"id": self.id,
"name": self.name,
"sources": sources,
"domain": domain,
"default": {
"id": self.default.id,
"name": self.default.name,
"color": self.default.color,
},
}
[docs]
def legend_json(self) -> list[dict[str, Any]]:
return [
{
"id": person_set.id,
"name": person_set.name,
"color": person_set.color,
}
for person_set in self.person_sets.values()
]
[docs]
def domain_json(self) -> dict[str, Any]:
"""Produce a JSON to represent domain of this PersonSetCollection."""
domain = [
{
"id": person_set.id,
"name": person_set.name,
"color": person_set.color,
}
for person_set in self.person_sets.values()
]
return {
"id": self.id,
"name": self.name,
"domain": domain,
}
[docs]
def get_stats(self) -> dict[str, dict[str, int]]:
"""
Return a dictionary with statistics for each PersonSet.
The statistics are a dictionary containing the amount of parents
and children in the set.
"""
result = {}
for set_id, person_set in self.person_sets.items():
children_stats = person_set.get_children_stats()
parents = children_stats.parents
children = children_stats.total
result[set_id] = {
"parents": parents,
"children": children,
}
return result
[docs]
def query_fpids(self, query: PSCQuery) -> set[tuple[str, str]] | None:
"""Query the PersonSetCollection for the selected person sets."""
if query.psc_id != self.id:
raise ValueError(
f"Query for PersonSetCollection {query.psc_id} "
f"on PersonSetCollection {self.id}")
all_person_sets = set(self.person_sets.keys())
if all_person_sets & query.selected_person_sets == all_person_sets:
# Everything is selected
return None
result: set[tuple[str, str]] = set()
for set_id in query.selected_person_sets:
if set_id not in self.person_sets:
continue
result.update(self.person_sets[set_id].persons.keys())
return result
[docs]
def query_person_ids(self, query: PSCQuery) -> set[str] | None:
"""Query the PersonSetCollection for the selected person sets."""
fpids = self.query_fpids(query)
if fpids is None:
return None
return {fpid[1] for fpid in fpids}
[docs]
@dataclass
class PSCQuery:
"""Person set collection query."""
psc_id: str
selected_person_sets: set[str]