[docs]defset_up_local_cluster(cluster_conf:dict[str,Any])->Cluster:"""Create a local cluster using the passed cluster configuration."""# pylint: disable=import-outside-toplevelfromdask.distributedimportLocalClusterkwargs=copy.copy(cluster_conf)number_of_workers=kwargs.pop("number_of_workers",None)threads_per_worker=kwargs.pop("threads_per_worker",None)ifnumber_of_workersisnotNone:kwargs["n_workers"]=number_of_workersifthreads_per_workerisnotNone:kwargs["threads_per_worker"]=threads_per_workerreturnLocalCluster(**kwargs)
[docs]defset_up_manual_client(cluster_conf:dict[str,Any])->Client:"""Create a dask client using the passed cluster configuration."""# pylint: disable=import-outside-toplevelparams=cluster_conf.get("params",{})scheduler_address=params.pop("address",None)ifscheduler_addressisNone:raiseValueError("Cluster configuration must contain scheduler 'address' key.")returnClient(address=scheduler_address,**params)
[docs]defsetup_client_from_config(cluster_config:dict[str,Any],*,number_of_workers:int|None=None,)->tuple[Client,dict[str,Any]]:"""Create a dask client from the provided config."""logger.info("CLUSTER CONFIG: %s",cluster_config)cluster_type=cluster_config["type"]ifcluster_type=="manual":returnset_up_manual_client(cluster_config),cluster_configcluster_params=cluster_config.get("params",{})cluster_params["number_of_workers"]=number_of_workerscluster=_CLUSTER_TYPES[cluster_type](cluster_params)number_of_threads_config=cluster_config.get("number_of_threads")ifnumber_of_threads_configisnotNone:logger.warning("The 'number_of_threads' key in the cluster config is deprecated. ""Use 'number_of_workers' instead.")number_of_workers_config=cluster_config.get("number_of_workers",number_of_threads_config)ifnumber_of_workersisnotNone:logger.info("Overriding the configurated number of workers from ""CLI parameter to %d workers.",number_of_workers,)number_of_workers_config=number_of_workersifnumber_of_workers_configisnotNone:cluster.scale(n=number_of_workers_config)elif"adapt_params"incluster_config:cluster.adapt(**cluster_config["adapt_params"])client=Client(cluster)returnclient,cluster_config
[docs]defsetup_client(cluster_name:str|None=None,number_of_workers:int|None=None,)->tuple[Client,dict[str,Any]]:"""Create a dask client from the provided cluster name."""ifcluster_nameisNone:cluster_name=dask.config.get(# pyright: ignore"dae_named_cluster.default")clusters={conf["name"]:confforconfindask.config.get(# pyright: ignore"dae_named_cluster.clusters")}cluster_config=clusters[cluster_name]returnsetup_client_from_config(cluster_config,number_of_workers=number_of_workers,)