"""Classes and helper functions to represent pedigree layout."""
# pylint: disable=invalid-name
from __future__ import annotations
import logging
import re
from collections import defaultdict, namedtuple
from dataclasses import dataclass
from functools import reduce
from typing import Any, cast
from dae.pedigrees.family import Family
from dae.pedigrees.pedigrees import (
FamilyConnections,
Individual,
IntervalForVertex,
MatingUnit,
SandwichSolver,
)
logger = logging.getLogger(__name__)
_LAYOUT_REGEX = re.compile(r"(?P<rank>\d):(?P<x>\d*\.?\d+),(?P<y>\d*\.?\d+)")
[docs]
@dataclass
class Point:
# pylint: disable=invalid-name
x: float
y: float
[docs]
def layout_parser(layout: str) -> dict[str, int | float] | None:
"""Parse layout string."""
layout_groups = _LAYOUT_REGEX.search(layout)
if layout_groups:
parsed = layout_groups.groupdict()
result: dict[str, int | float] = {}
result["rank"] = int(parsed["rank"])
result["x"] = float(parsed["x"])
result["y"] = float(parsed["y"])
return result
return None
[docs]
class IndividualWithCoordinates:
"""Class to represent individuals with specified coordinates."""
SIZE = 21.0
def __init__(
self, individual: Individual,
x: float = 0.0, y: float = 0.0,
size: float = SIZE,
) -> None:
self.individual = individual
self.x = x
self.y = y
self.size = size
@property
def x_center(self) -> float:
return self.x + self.size / 2.0
@property
def y_center(self) -> float:
return self.y + self.size / 2.0
def __repr__(self) -> str:
return f"({self.x}, {self.y})"
[docs]
class Line:
"""Class to represent lines connecting individuals."""
# pylint: disable=too-many-arguments
def __init__(
self, x1: float, y1: float, x2: float, y2: float,
curve_base_height: float | None = None,
) -> None:
self.x1 = x1
self.y1 = y1
self.x2 = x2
self.y2 = y2
self.curve_base_height = curve_base_height
@property
def curved(self) -> bool:
return self.curve_base_height is not None
[docs]
def curve_p0(self) -> tuple[float, float]:
return (self.x1, self.y1)
[docs]
def curve_p1(self) -> tuple[float, float]:
assert self.curve_base_height is not None
return (self.x1 + 10, self.y1 + self.curve_base_height * 3.0)
[docs]
def curve_p2(self) -> tuple[float, float]:
return (self.x2, self.y2)
[docs]
def curve_p3(self) -> tuple[float, float]:
return (self.x2, self.y2)
[docs]
def inverse_curve_p1(self) -> tuple[float, float]:
assert self.curve_base_height is not None
return (self.x1 + 10, self.y1 - self.curve_base_height * 3.0)
[docs]
def inverse_curve_p2(self) -> tuple[float, float]:
return (self.x2, self.y2)
[docs]
def curve_y_at(self, t: float) -> float:
assert 0 <= t <= 1
one_minus_t = 1.0 - t
return (
(one_minus_t ** 3) * self.curve_p0()[1]
+ 3 * (one_minus_t ** 2) * t * self.curve_p1()[1]
+ 3 * one_minus_t * (t ** 2) * self.curve_p2()[1]
+ (t ** 3) * self.curve_p3()[1]
)
[docs]
def inverse_curve_y_at(self, t: float) -> float:
assert 0 <= t <= 1
one_minus_t = 1.0 - t
return (
(one_minus_t ** 3) * self.curve_p0()[1]
+ 3 * (one_minus_t ** 2) * t * self.inverse_curve_p1()[1]
+ 3 * one_minus_t * (t ** 2) * self.inverse_curve_p2()[1]
+ (t ** 3) * self.curve_p3()[1]
)
def __repr__(self) -> str:
return f"[({self.x1},{self.y1}) - ({self.x2},{self.y2})]"
[docs]
class Layout:
"""Represents a layout of a connected component of a family."""
def __init__(
self, intervals: list[IntervalForVertex] | None = None,
) -> None:
self._intervals = intervals
self.lines: list[Line] = []
self.positions: list[list[IndividualWithCoordinates]] = []
self._individuals_by_rank = []
self._id_to_position = {}
if intervals is not None:
self._individuals_by_rank = self._intervals_by_ranks()
self._id_to_position = self._generate_simple_layout(
self._individuals_by_rank,
)
self._generate_from_intervals()
BBox = namedtuple("BBox", ["min_x", "min_y", "max_x", "max_y"])
[docs]
def get_bbox(self) -> Layout.BBox:
"""Calculate the bounding box of a layout."""
min_x = 0.0
max_x = 0.0
min_y = 0.0
max_y = 0.0
for i in self._id_to_position.values():
min_x = min(min_x, i.x)
min_y = min(min_y, i.y)
max_x = max(max_x, i.x)
max_y = max(max_y, i.y)
return Layout.BBox(min_x, min_y, max_x, max_y)
[docs]
def translate(
self, x_offset: float = 0.0, y_offset: float = 0.0,
) -> None:
"""Translate a layout."""
for generation in self.positions:
for individual in generation:
individual.x += x_offset
individual.y += y_offset
for line in self.lines:
line.x1 += x_offset
line.x2 += x_offset
line.y1 += y_offset
line.y2 += y_offset
[docs]
def scale(self, scale: float = 1.0) -> None:
"""Scale the layout."""
for generation in self.positions:
for individual in generation:
individual.x *= scale
individual.y *= scale
individual.size *= scale
for line in self.lines:
line.x1 *= scale
line.x2 *= scale
line.y1 *= scale
line.y2 *= scale
@staticmethod
def _handle_broken_family_connections(family: Family) -> Layout:
# pylint: disable=protected-access
individuals = []
for person in family.full_members:
individuals.append(Individual(member=person))
layout = Layout()
layout._individuals_by_rank = [individuals]
layout._id_to_position = layout._generate_simple_layout([individuals])
layout.positions = [list(layout._id_to_position.values())]
return layout
@staticmethod
def _build_family_layout(
family: Family,
family_connections: FamilyConnections | None,
) -> Layout:
if family_connections is None:
logger.warning(
"missing family connections for family: %s", family.family_id)
return Layout._handle_broken_family_connections(family)
assert family_connections is not None
assert family_connections.is_connected()
sandwich_instance = family_connections.create_sandwich_instance()
intervals = SandwichSolver.solve(sandwich_instance)
if intervals is None:
logger.warning("no intervals for family: %s", family.family_id)
return Layout._handle_broken_family_connections(family)
individuals_intervals = [
interval
for interval in intervals
if interval.vertex.is_individual()
]
return Layout(individuals_intervals)
[docs]
@staticmethod
def from_family(
family: Family, add_missing_members: bool = True,
) -> list[Layout]:
"""Generate layout for each connected component of a family."""
family_connections = FamilyConnections.from_family(
family, add_missing_members=add_missing_members,
)
if not family_connections:
return [Layout._handle_broken_family_connections(family)]
if family_connections.is_connected():
logger.debug(
"building layout for connected family: %s", family.family_id)
return [Layout._build_family_layout(family, family_connections)]
layouts = []
# pylint: disable=import-outside-toplevel
for component in family_connections.connected_components():
members = [
m for m in family.full_members
if m.person_id in component
]
fam = Family.from_persons(members)
fc = FamilyConnections.from_family(fam)
# assert fc.is_connected(), fam.family_id
layouts.append(Layout._build_family_layout(fam, fc))
x_offset = 0.0
for layout in layouts:
layout.translate(x_offset, 0.0)
bbox = layout.get_bbox()
x_offset = bbox.max_x + IndividualWithCoordinates.SIZE + 4.0
return layouts
[docs]
@staticmethod
def from_family_layout(family: Family) -> Layout | None:
"""Construct layout for each connected component using layout data."""
if any(p.layout is None for p in family.full_members):
logger.warning(
"family %s has member without layout", family.family_id)
return None
family_connections = FamilyConnections.from_family(
family, add_missing_members=False,
)
if family_connections is None:
logger.warning(
"can't build family connections for family %s",
family.family_id)
return None
layout_positions = defaultdict(list)
for person in family_connections.members:
assert person.layout is not None, person
position = layout_parser(person.layout)
if position is None:
logger.warning(
"can't parse layout for person %s: %s",
person, person.layout)
return None
individual = family_connections.get_individual(person.person_id)
assert individual is not None
assert isinstance(position["rank"], int), (
person,
person.layout,
position,
)
layout_positions[position["rank"]].append(
IndividualWithCoordinates(
individual, position["x"], position["y"],
),
)
individual_positions: list[list] = [[]] * len(layout_positions)
for level, iwc in layout_positions.items():
individual_positions[level - 1] = iwc
individual_positions = [
sorted(level, key=lambda x: x.x)
for level in individual_positions
]
layout = Layout()
layout.positions = individual_positions
layout._create_lines() # pylint: disable=protected-access
return layout
@property
def id_to_position(self) -> dict[str, IndividualWithCoordinates]:
assert all(i.member is not None for i in self._id_to_position)
return {
k.member.person_id: v # type: ignore
for k, v in self._id_to_position.items()
}
@property
def individuals_by_rank(self) -> dict[str, int]:
"""Return a dictionary mapping individual person IDs to their rank.
The rank is determined by the order of individuals in the
`_individuals_by_rank` list.
The higher the rank, the higher the position in the list.
Returns:
A dictionary mapping individual person IDs to their rank.
"""
assert all(
i.member is not None
for individuals in self._individuals_by_rank
for i in individuals)
return {
individual.member.person_id: rank # type: ignore
for rank, individuals in enumerate(
self._individuals_by_rank, start=1,
)
for individual in individuals
}
[docs]
def apply_to_family(self, family: Family) -> None:
"""Store family layout as individuals attributes."""
for person_id, person in family.persons.items():
if person_id not in self.id_to_position:
continue
assert person_id in self.id_to_position, (
person_id,
family,
self.id_to_position,
)
position = self.id_to_position[person_id]
rank = self.individuals_by_rank[person_id]
position_value = f"{rank}:{position.x},{position.y}"
person.set_attr("layout", position_value)
def _generate_from_intervals(self) -> None:
self._optimize_drawing()
self._create_positioned_individuals()
self._create_lines()
def _optimize_drawing(self, max_iter: int = 100) -> None:
# for level in self._individuals_by_rank:
# print(level)
moved_individuals = -1
counter = 1
while moved_individuals and counter < max_iter:
# print
# print("new iter")
moved_individuals = 0
if counter % 6 < 3:
moved_individuals += self._align_parents_of_children()
# print(moved_individuals, "aligned parents of children")
moved_individuals += self._align_children_of_parents()
# print(moved_individuals, "aligned children of parents")
moved_individuals += self._align_multiple_mates()
else:
moved_individuals += self._align_children_of_parents()
# print(moved_individuals, "aligned children of parents")
moved_individuals += self._align_parents_of_children()
# print(moved_individuals, "aligned parents of children")
moved_individuals += self._align_multiple_mates()
# moved_individuals += self._set_mates_equally_apart()
# print(moved_individuals, "set mates equally apart")
moved_individuals += self._move_overlaps()
# print(moved_individuals, "moved overlapping individuals")
counter += 1
# print(("done", counter))
self._align_left()
def _create_positioned_individuals(self) -> None:
for level in self._individuals_by_rank:
self.positions.append([self._id_to_position[x] for x in level])
def _create_lines(self, y_offset: int = 15) -> None:
for level in self.positions:
for start, individual in enumerate(level):
if individual.individual.parents:
self.lines.append(
Line(
individual.x_center,
individual.y,
individual.x_center,
individual.y_center - y_offset,
),
)
for i, other_individual in enumerate(level[start + 1:]):
are_next_to_eachother = i == 0
if individual.individual.are_mates(
other_individual.individual,
):
middle_x = (
individual.x_center + other_individual.x_center
) / 2.0
if are_next_to_eachother:
self.lines.append(
Line(
individual.x + individual.size,
individual.y_center,
other_individual.x,
other_individual.y_center,
),
)
self.lines.append(
Line(
middle_x,
individual.y_center,
middle_x,
individual.y_center + y_offset,
),
)
continue
line = Line(
individual.x + individual.size,
individual.y_center,
other_individual.x,
other_individual.y_center,
y_offset,
)
self.lines.append(line)
percent_x = (middle_x - individual.x_center) / (
other_individual.x_center - individual.x_center
)
center_y = line.inverse_curve_y_at(percent_x)
self.lines.append(
Line(
middle_x,
center_y,
middle_x,
individual.y_center + y_offset,
),
)
i = 0
while i < len(level) - 1:
j = len(level) - 1
while i < j:
individual = level[i]
other_individual = level[j]
if individual.individual.are_siblings(
other_individual.individual,
):
self.lines.append(
Line(
individual.x_center,
individual.y_center - y_offset,
other_individual.x_center,
other_individual.y_center - y_offset,
),
)
i = j
break
j -= 1
i += 1
def _align_left(self, x_offset: int = 10) -> None:
min_x = min(i.x for i in list(self._id_to_position.values()))
for individual in list(self._id_to_position.values()):
individual.x = individual.x - min_x + x_offset
def _align_multiple_mates(self) -> int:
moved = 0
for level in self._individuals_by_rank:
for individual in level:
if len(individual.mating_units) > 2:
moved += self._align_multiple_mates_of_individual(
individual, level,
)
return moved
def _align_multiple_mates_of_individual(
self, individual: Individual, level: list[Individual],
) -> int:
moved = 0
others = {
mu.other_parent(individual) for mu in individual.mating_units
}
individual_position = self._id_to_position[individual]
indices = [level.index(i) for i in list(others)]
indices = sorted(indices)
common_parent_index = level.index(individual)
left_of_common_parent = [i for i in indices if i < common_parent_index]
right_of_common_parent = [
i for i in indices if i > common_parent_index
]
right_of_common_parent_reversed = list(
reversed(right_of_common_parent),
)
for index, parent_index in enumerate(left_of_common_parent[:-1]):
parent = level[parent_index]
parent_position = self._id_to_position[parent]
to_compare = level[left_of_common_parent[index + 1]]
arch_width = individual_position.x - parent_position.x
compare_width = (
individual_position.x
- self._id_to_position[to_compare].x
+ individual_position.size
)
if arch_width < 2 * compare_width:
moved += self._move(
[parent_position], -(2 * compare_width - arch_width),
)
for i, parent_index in enumerate(right_of_common_parent_reversed[:-1]):
parent = level[parent_index]
parent_position = self._id_to_position[parent]
to_compare = level[right_of_common_parent_reversed[i + 1]]
arch_width = parent_position.x - individual_position.x
compare_width = (
self._id_to_position[to_compare].x
- individual_position.x
+ individual_position.size
)
if arch_width < 2 * compare_width:
moved += self._move(
[parent_position], 2 * compare_width - arch_width,
)
return moved
def _set_mates_equally_apart(self) -> int:
moved = 0
for level in self._individuals_by_rank:
mating_units = self._get_mates_on_level(level)
dual_mating_units = {
(mu1, mu2)
for mu1 in mating_units
for mu2 in mating_units
if (mu1.father is mu2.father) ^ (mu1.mother is mu2.mother)
}
for mu1, mu2 in dual_mating_units:
if mu1.father is mu2.father:
ordered_parents_ids = [mu1.mother, mu1.father, mu2.mother]
else:
ordered_parents_ids = [mu1.father, mu1.mother, mu2.father]
ordered_parents = [
self._id_to_position[i] for i in ordered_parents_ids
]
if ordered_parents[0].x > ordered_parents[2].x:
ordered_parents[0], ordered_parents[2] = (
ordered_parents[2],
ordered_parents[0],
)
dist1 = ordered_parents[1].x - ordered_parents[0].x
dist2 = ordered_parents[2].x - ordered_parents[1].x
assert dist1 > 0
assert dist2 > 0
if dist1 < 0 or dist2 < 0 or abs(dist1 - dist2) < 1e-5:
return 0
if dist1 > dist2:
moved += self._move(ordered_parents[2:], dist1 - dist2)
else:
moved += self._move(ordered_parents[1:], dist2 - dist1)
return moved
def _move_overlaps(self, gap_size: float = 8.0) -> int:
moved = 0
first_individual_position = self._id_to_position[
self._individuals_by_rank[0][0]
]
min_gap = first_individual_position.size + gap_size
for level in self._individuals_by_rank:
level_with_positions = [self._id_to_position[i] for i in level]
for index, individual1 in enumerate(level_with_positions):
for individual2 in level_with_positions[index + 1: index + 2]:
diff = individual2.x - individual1.x
if min_gap - diff > 1:
moved += self._move([individual2], min_gap - diff)
return moved
def _align_children_of_parents(self) -> int:
moved = 0
for level in self._individuals_by_rank:
mating_units = self._get_mates_on_level(level)
for mates in mating_units:
moved += self._center_children_of_parents(mates)
return moved
def _center_children_of_parents(self, mating_unit: MatingUnit) -> int:
children = self._get_first_and_last_children_positions(mating_unit)
children_center = (children[0].x + children[1].x) / 2.0
mother = self._id_to_position[mating_unit.mother]
father = self._id_to_position[mating_unit.father]
parents_center = (father.x + mother.x) / 2.0
offset = parents_center - children_center
return self._move(children, offset)
def _align_parents_of_children(self) -> int:
moved = 0
for level in reversed(self._individuals_by_rank):
sibship_groups = self._get_sibships_on_level(level)
for sibship in sibship_groups:
moved += self._center_parents_of_children(sibship)
return moved
def _center_parents_of_children(self, sibship: list[Individual]) -> int:
assert len(sibship) > 0
some_child = sibship[0]
start_x = self._id_to_position[some_child].x
end_x = self._id_to_position[sibship[len(sibship) - 1]].x
children_center = (start_x + end_x) / 2.0
assert some_child.parents is not None
mother = some_child.parents.mother
father = some_child.parents.father
mother_position = self._id_to_position[mother]
father_position = self._id_to_position[father]
ordered_parents = [mother_position, father_position]
if mother_position.x > father_position.x:
ordered_parents = [father_position, mother_position]
parents_center = (mother_position.x + father_position.x) / 2.0
offset = children_center - parents_center
if offset > 0:
return self._move(ordered_parents[1:], offset)
return self._move(ordered_parents[0:1], offset)
def _move(
self, individuals: list[IndividualWithCoordinates],
offset: float,
already_moved: set[IndividualWithCoordinates] | None = None,
min_gap: float = 8.0,
) -> int:
assert len(individuals) > 0
if already_moved is None:
already_moved = set()
if abs(offset) < 1e-5:
return 0
individuals = list(set(individuals) - already_moved)
min_individual = reduce(
lambda a, b: a if a.x < b.x else b, individuals,
)
max_individual = reduce(
lambda a, b: a if a.x > b.x else b, individuals,
)
level = self._get_level_of_individual(min_individual.individual)
assert level is not None
level_individuals = level[
level.index(min_individual.individual):
level.index(max_individual.individual) + 1
]
individuals = [self._id_to_position[x] for x in level_individuals]
individuals = list(set(individuals) - already_moved)
if len(individuals) == 0:
return 0
to_move_offset = 0.0
if offset > 0:
new_end = max_individual.x + offset
to_move = {
i
for x in level
for i in [self._id_to_position[x]]
if min_individual.x <= i.x <= new_end
}
to_move -= already_moved
to_move -= set(individuals)
if to_move != set():
to_move_offset = max(
new_end - i.x + min_gap * 2.0 + i.size for i in to_move
)
else:
new_start = min_individual.x + offset
to_move = {
i
for x in level
for i in [self._id_to_position[x]]
if new_start <= i.x <= max_individual.x
}
to_move -= already_moved
to_move -= set(individuals)
if to_move != set():
to_move_offset = min(
new_start - i.x - min_gap * 2.0 - i.size for i in to_move
)
for individual in individuals:
individual.x += offset
# print("moving with", offset)
other_moved = 0
if to_move != set():
other_moved = self._move(
list(to_move), to_move_offset, already_moved | set(individuals),
)
return len(individuals) + other_moved
@staticmethod
def _get_mates_on_level(level: list[Individual]) -> set[MatingUnit]:
return {mu for i in level for mu in i.mating_units}
def _get_first_and_last_children_positions(
self, mating_unit: MatingUnit,
) -> list[IndividualWithCoordinates]:
children = mating_unit.children.individuals
children_positions = [self._id_to_position[x] for x in children]
children_positions = sorted(
children_positions, key=lambda x: x.x)
return [
children_positions[0],
children_positions[len(children_positions) - 1],
]
@staticmethod
def _get_sibships_on_level(
level: list[Individual],
) -> list[Any | list[Individual]]:
individuals_with_parents = [i for i in level if bool(i.parents)]
def reducer(
acc: list[list[Individual]], x: Individual,
) -> list[list[Individual]]:
if len(acc) == 0:
return [[x]]
last_array = acc[len(acc) - 1]
if x.are_siblings(last_array[0]):
last_array.append(x)
else:
acc.append([x])
return acc
return reduce(reducer, individuals_with_parents, [])
def _get_level_of_individual(
self, individual: Individual,
) -> list[Individual] | None:
for individuals_on_level in self._individuals_by_rank:
if individual in individuals_on_level:
return individuals_on_level
return None
@staticmethod
def _generate_simple_layout(
individuals_by_rank: list[list[Individual]],
level_heigh: float = 30.0,
x_offset: float = 20.0,
y_offset: float = 20.0,
gap: float = 8.0,
) -> dict[Individual, IndividualWithCoordinates]:
result = {}
original_x_offset = x_offset
for rank, individuals in enumerate(individuals_by_rank):
x_offset = original_x_offset
for individual in individuals:
position = IndividualWithCoordinates(
individual, x_offset, (rank + 1) * level_heigh + y_offset,
)
result[individual] = position
x_offset += position.size + gap
return result
def _intervals_by_ranks(self) -> list[list[Individual]]:
assert self._intervals is not None
result = []
rank_to_individuals = defaultdict(list)
for interval in self._intervals:
rank_to_individuals[
cast(Individual, interval.vertex).rank].append(interval)
for key in sorted(rank_to_individuals.keys()):
sorted_intervals = sorted(
rank_to_individuals[key], key=lambda x: (x.left, x.right),
)
# print(list(map(lambda x: x.vertex, sorted_intervals)))
result.append([x.vertex for x in sorted_intervals])
return cast(list[list[Individual]], result)