Source code for users_api.management.commands.groups_cleanup

import logging

from datasets_api.models import Dataset
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.management.base import BaseCommand
from django.db.models import Q

from .dataset_mixin import DatasetBaseMixin

logger = logging.getLogger(__name__)


[docs] class Command(BaseCommand, DatasetBaseMixin): help = "clean up groups that are not in use" def __init__(self, **kwargs): DatasetBaseMixin.__init__(self) BaseCommand.__init__(self, **kwargs)
[docs] def add_arguments(self, parser): parser.add_argument("--dry-run", "-n", action="store_true")
[docs] def cleanup_wdae_datasets(self, dry_run): dataset_ids = set(self.gpf_instance.get_genotype_data_ids()) datasets = list(Dataset.objects.all()) for dataset in datasets: logger.debug(f"checking wdae dataset: {dataset.dataset_id}") if dataset.dataset_id not in dataset_ids: logger.info(f"removing dataset {dataset.dataset_id}") if not dry_run: dataset.delete()
[docs] def cleanup_groups(self, dry_run): groups = list(Group.objects.filter( Q(groupobjectpermission__permission__codename="view")).distinct()) for group in groups: logger.debug( f"checking group: {group.name} ({group.id})") users = list(group.user_set.all()) if users: logger.debug( f"group {group.name} has users {len(users)}; keeping...") logger.debug(f"{[user.email for user in users]}") continue logger.debug(f"group {group.name} has no attached users") permissions = list(group.groupobjectpermission_set.all()) for permission in permissions: if permission.content_object is None: logger.debug(f"removing permission: {permission}") if not dry_run: permission.delete() permissions = list(group.groupobjectpermission_set.all()) logger.debug(f"permissions: {permissions};") if not permissions: logger.info(f"removing group {group.name}") if not dry_run: group.delete()
[docs] def cleanup_users(self, dry_run): User = get_user_model() users = User.objects.all() for user in users: email = user.email assert email == email.lower() groups = list(user.groups.all()) group_names = [g.name for g in groups] # print(groups) # print(group_names) if email not in group_names: logger.warning( f"a group for user {email} needs renaming {group_names}") for group in groups: if email != group.name.strip().lower(): continue logger.warning( f"the group that should be renamed is: {group}") if not dry_run: group.name = email group.save() break
[docs] def handle(self, *args, **options): dry_run = options["dry_run"] self.cleanup_wdae_datasets(dry_run) self.cleanup_groups(dry_run) self.cleanup_users(dry_run)