from __future__ import annotations
import argparse
import gc
import gzip
import itertools
import logging
import os
import sys
import traceback
from collections.abc import Iterable, Sequence
from contextlib import chdir, closing
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import Any, TextIO, cast
from pysam import TabixFile, tabix_compress, tabix_index
from gain import __version__
from gain.annotation import annotation_pipeline as _annotation_pipeline
from gain.annotation.annotate_utils import (
add_common_annotation_arguments,
add_input_files_to_task_graph,
build_cli_genomic_context,
cache_pipeline_resources,
check_resource_locality,
emit_annotation_plan,
get_grr_from_context,
get_pipeline_from_context,
handle_default_args,
maybe_remove_work_dir,
maybe_wrap_reannotation,
produce_partfile_paths,
produce_regions,
stringify,
)
from gain.annotation.annotation_config import (
RawAnnotatorsConfig,
RawPipelineConfig,
)
from gain.annotation.annotation_factory import (
build_annotation_pipeline,
)
from gain.annotation.annotation_pipeline import (
AnnotationPipeline,
Attribute,
ReannotationPipeline,
)
from gain.annotation.processing_pipeline import (
Annotation,
AnnotationPipelineAnnotatablesBatchFilter,
AnnotationPipelineAnnotatablesFilter,
AnnotationsWithSource,
DeleteAttributesFromAWSBatchFilter,
DeleteAttributesFromAWSFilter,
)
from gain.annotation.record_to_annotatable import (
RECORD_TO_ANNOTATABLE_CONFIGURATION,
DaeAlleleRecordToAnnotatable,
RecordToCNVAllele,
RecordToPosition,
RecordToRegion,
RecordToVcfAllele,
add_record_to_annotable_arguments,
build_record_to_annotatable,
)
from gain.genomic_resources.reference_genome import (
ReferenceGenome,
build_reference_genome_from_resource_id,
)
from gain.genomic_resources.repository_factory import (
build_genomic_resource_repository,
)
from gain.task_graph.cli_tools import TaskGraphCli
from gain.task_graph.graph import TaskGraph
from gain.utils.fs_utils import (
is_compressed_filename,
strip_compression_suffix,
tabix_index_filename,
)
from gain.utils.processing_pipeline import Filter, PipelineProcessor, Source
from gain.utils.regions import Region
from gain.utils.verbosity_configuration import VerbosityConfiguration
logger = logging.getLogger("annotate_tabular")
class _CSVSource(Source):
"""Source for delimiter-separated values files."""
def __init__(
self,
path: str,
ref_genome: ReferenceGenome | None,
columns_args: dict[str, str],
input_separator: str,
):
self.path = path
self.ref_genome = ref_genome
self.columns_args = columns_args
self.source_file: TextIO | TabixFile
self.input_separator = input_separator
self.header: list[str] = self._extract_header()
def __enter__(self) -> _CSVSource:
index_filename = (
tabix_index_filename(self.path)
if is_compressed_filename(self.path) else None
)
if index_filename is not None:
self.source_file = TabixFile(
self.path, index=index_filename, encoding="utf-8")
elif is_compressed_filename(self.path):
self.source_file = gzip.open(self.path, "rt")
self.source_file.readline() # Skip header line
else:
self.source_file = open(self.path, "rt")
self.source_file.readline() # Skip header line
if self.ref_genome is not None:
self.ref_genome.open()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if exc_type is not None:
logger.error(
"exception during annotation: %s, %s, %s",
exc_type, exc_value, traceback.format_tb(exc_tb))
self.source_file.close()
if self.ref_genome is not None:
self.ref_genome.close()
return exc_type is None
def _extract_header(self) -> list[str]:
if is_compressed_filename(self.path):
with gzip.open(self.path, "rt") as in_file_raw:
raw_header = in_file_raw.readline()
else:
with open(self.path, "rt") as in_file_raw:
raw_header = in_file_raw.readline()
return [
c.strip("#")
for c in raw_header.strip("\r\n").split(self.input_separator)
]
def _get_line_iterator(self, region: Region | None) -> Iterable:
if not isinstance(self.source_file, TabixFile):
return self.source_file
if region is None:
return self.source_file.fetch() # type: ignore
assert region.start is not None
return self.source_file.fetch( # type: ignore
region.chrom, region.start - 1, region.stop)
def fetch(
self, region: Region | None = None,
) -> Iterable[AnnotationsWithSource]:
line_iterator = self._get_line_iterator(region)
record_to_annotatable = build_record_to_annotatable(
self.columns_args, set(self.header),
ref_genome=self.ref_genome)
reg_start = region.start if region and region.start is not None else 1
errors = []
for lnum, line in enumerate(line_iterator):
try:
columns = line.strip("\n\r").split(self.input_separator)
record = dict(zip(self.header, columns, strict=True))
annotatable = record_to_annotatable.build(record)
if annotatable.position < reg_start:
continue
yield AnnotationsWithSource(
record, [Annotation(annotatable, dict(record))],
)
except Exception as ex: # pylint: disable=broad-except
logger.exception(
"unexpected input data format at line: %s", line)
errors.append((
lnum, line,
"".join(traceback.format_exception(ex)), str(ex)))
if len(errors) >= 10:
break
if len(errors) > 0:
for _lnum, line, error, message in errors:
logger.error("line: %s", line)
logger.error("\t%s", message)
logger.error("\t%s", error)
lnum, line, error, message = errors[0]
raise ValueError(
f"errors occured during reading of CSV file starting at "
f"line: {line.strip()}: {message}")
class _CSVBatchSource(Source):
"""Batched source for delimiter-separated values files."""
def __init__(
self,
path: str,
ref_genome: ReferenceGenome | None,
columns_args: dict[str, str],
input_separator: str,
batch_size: int,
):
self.source = _CSVSource(
path, ref_genome, columns_args, input_separator)
self.header = self.source.header
self.batch_size = batch_size
def __enter__(self) -> _CSVBatchSource:
self.source.__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if exc_type is not None:
logger.error(
"exception during annotation: %s, %s, %s",
exc_type, exc_value, traceback.format_tb(exc_tb))
self.source.__exit__(exc_type, exc_value, exc_tb)
return exc_type is None
def fetch(
self, region: Region | None = None,
) -> Iterable[Sequence[AnnotationsWithSource]]:
records = self.source.fetch(region)
while batch := tuple(itertools.islice(records, self.batch_size)):
yield batch
class _CSVWriter(Filter):
"""Writes delimiter-separated values to a file."""
def __init__(
self,
path: str,
separator: str,
header: _CSVHeader,
) -> None:
self.path = path
self.separator = separator
self.header = header
self.out_file: Any
def __enter__(self) -> _CSVWriter:
self.out_file = open(self.path, "w")
header_row = self.separator.join([
*self.header.input_header,
*self.header.annotation_header,
])
self.out_file.write(f"{header_row}\n")
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if exc_type is not None:
logger.error(
"exception during annotation: %s, %s, %s",
exc_type, exc_value, traceback.format_tb(exc_tb))
self.out_file.close()
return exc_type is None
def filter(self, data: AnnotationsWithSource) -> None:
context = data.annotations[0].context
source = data.source
source_result = {
col: source[col]
for col in self.header.input_header
}
annotation_result = {
col: context[col]
for col in self.header.annotation_header
}
self.out_file.write(
self.separator.join(
stringify(val)
for val in [
*source_result.values(), *annotation_result.values()]))
self.out_file.write("\n")
class _CSVBatchWriter(Filter):
"""Writes delimiter-separated values to a file in batches."""
def __init__(
self,
path: str,
separator: str,
header: _CSVHeader,
) -> None:
self.writer = _CSVWriter(path, separator, header)
def __enter__(self) -> _CSVBatchWriter:
self.writer.__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if exc_type is not None:
logger.error(
"exception during annotation: %s, %s, %s",
exc_type, exc_value, traceback.format_tb(exc_tb))
self.writer.__exit__(exc_type, exc_value, exc_tb)
return exc_type is None
def filter(self, data: Sequence[AnnotationsWithSource]) -> None:
for record in data:
self.writer.filter(record)
@dataclass
class _CSVHeader:
input_header: list[str]
annotation_header: list[str]
def _is_reannotation(pipeline: AnnotationPipeline) -> bool:
# Use the canonical class (not the module-level re-export, which tests
# replace with a mocker.spy) so the check survives spying.
return isinstance(pipeline, _annotation_pipeline.ReannotationPipeline)
def _build_new_header(
input_header: list[str],
annotation_attributes: list[Attribute],
attributes_to_delete: Sequence[str],
*,
reannotation: bool = False,
) -> _CSVHeader:
annotation_header = [
attr.name for attr in annotation_attributes if not attr.internal
]
drop = set(attributes_to_delete)
if reannotation:
# On a reannotation run every new-pipeline attribute is re-emitted in
# the annotation section -- including COPIED (reused) attributes whose
# value is carried through the annotation context from the source row
# (#111). Drop the matching input column so the attribute appears
# exactly once instead of being duplicated. A plain annotation run,
# by contrast, preserves a same-named input column and appends the
# freshly-computed one (the "append columns" behaviour).
drop |= set(annotation_header)
result = [col for col in input_header if col not in drop]
return _CSVHeader(
result,
annotation_header,
)
def _build_sequential(
input_path: str,
pipeline: AnnotationPipeline,
output_path: str,
args: dict[str, Any],
reference_genome: ReferenceGenome | None,
attributes_to_delete: Sequence[str],
) -> PipelineProcessor:
source = _CSVSource(
input_path,
reference_genome,
args["columns_args"],
args["input_separator"],
)
filters: list[Filter] = []
new_header = _build_new_header(
source.header, pipeline.get_attributes(), attributes_to_delete,
reannotation=_is_reannotation(pipeline))
filters.extend([
DeleteAttributesFromAWSFilter(attributes_to_delete),
AnnotationPipelineAnnotatablesFilter(pipeline),
_CSVWriter(output_path, args["output_separator"], new_header),
])
return PipelineProcessor(source, filters)
def _build_batched(
input_path: str,
pipeline: AnnotationPipeline,
output_path: str,
args: dict[str, Any],
reference_genome: ReferenceGenome | None,
attributes_to_delete: Sequence[str],
) -> PipelineProcessor:
source = _CSVBatchSource(
input_path,
reference_genome,
args["columns_args"],
args["input_separator"],
args["batch_size"],
)
filters: list[Filter] = []
new_header = _build_new_header(
source.header, pipeline.get_attributes(), attributes_to_delete,
reannotation=_is_reannotation(pipeline))
filters.extend([
DeleteAttributesFromAWSBatchFilter(attributes_to_delete),
AnnotationPipelineAnnotatablesBatchFilter(pipeline),
_CSVBatchWriter(output_path, args["output_separator"], new_header),
])
return PipelineProcessor(source, filters)
def _annotate_csv(
output_path: str,
pipeline_config: RawAnnotatorsConfig,
grr_definition: dict,
reference_genome_resource_id: str | None,
region: Region | None,
args: dict[str, Any],
) -> None:
"""Annotate a CSV file using a processing pipeline."""
build_cli_genomic_context(args)
grr = build_genomic_resource_repository(definition=grr_definition)
ref_genome = None
if reference_genome_resource_id is not None:
ref_genome = build_reference_genome_from_resource_id(
reference_genome_resource_id, grr)
pipeline = build_annotation_pipeline(
pipeline_config, grr,
allow_repeated_attributes=args["allow_repeated_attributes"],
work_dir=Path(args["work_dir"]),
)
pipeline = maybe_wrap_reannotation(pipeline, args, grr)
attributes_to_delete = (
pipeline.deleted_attributes
if isinstance(pipeline, ReannotationPipeline) else [])
_annotate_tabular_helper(
input_path=args["input"],
pipeline=pipeline,
output_path=output_path,
args=args,
reference_genome=ref_genome,
region=region,
attributes_to_delete=attributes_to_delete,
)
def _concat(
partfile_paths: list[str],
output_path: str,
keep_parts: bool, # noqa: FBT001
) -> None:
"""Concatenate multiple CSV files into a single CSV file *in order*."""
# Get any header from the partfiles, they should all be equal
# and usable as a final output header
with open(partfile_paths[0], "r") as partfile:
header = partfile.readline().strip()
with open(output_path, "w") as outfile:
outfile.write(header)
for path in partfile_paths:
with open(path, "r") as partfile:
partfile.readline() # skip header
for line in partfile:
outfile.write("\n")
outfile.write(line.strip("\r\n"))
outfile.write("\n")
if not keep_parts:
for partfile_path in partfile_paths:
os.remove(partfile_path)
def _read_header(filepath: str, separator: str = "\t") -> list[str]:
"""Extract header from columns file."""
if is_compressed_filename(filepath):
file = gzip.open(filepath, "rt") # noqa: SIM115
else:
file = open(filepath, "r") # noqa: SIM115
with file:
header = file.readline()
return [c.strip() for c in header.split(separator)]
def _count_tabular_rows(input_path: str, limit: int) -> int:
"""Count data rows (excluding the header), short-circuiting at limit."""
opener = gzip.open if is_compressed_filename(input_path) else open
count = 0
with opener(input_path, "rt") as in_file:
in_file.readline() # skip header
for _ in in_file:
count += 1
if count >= limit:
break
return count
def _tabix_compress(filepath: str, output_path: str | None = None) -> None:
"""Produce a tabix-compressed version of the given variants file."""
if output_path is None:
output_path = f"{filepath}.gz"
tabix_compress(filepath, output_path, force=True)
if os.path.exists(filepath):
os.remove(filepath)
def _tabix_index(filepath: str, args: dict | None = None) -> None:
"""Produce a tabix index file for the given variants file."""
header = _read_header(filepath)
line_skip = 0 if header[0].startswith("#") else 1
header = [c.strip("#") for c in header]
record_to_annotatable = build_record_to_annotatable(
args if args is not None else {},
set(header),
)
if isinstance(record_to_annotatable, (RecordToRegion,
RecordToCNVAllele)):
seq_col = header.index(record_to_annotatable.chrom_col)
start_col = header.index(record_to_annotatable.pos_beg_col)
end_col = header.index(record_to_annotatable.pos_end_col)
elif isinstance(record_to_annotatable, RecordToVcfAllele):
seq_col = header.index(record_to_annotatable.chrom_col)
start_col = header.index(record_to_annotatable.pos_col)
end_col = start_col
elif isinstance(
record_to_annotatable,
(RecordToPosition, DaeAlleleRecordToAnnotatable)):
seq_col = header.index(record_to_annotatable.chrom_column)
start_col = header.index(record_to_annotatable.pos_column)
end_col = start_col
else:
raise TypeError(
"Could not generate tabix index: record"
f" {type(record_to_annotatable)} is of unsupported type.")
logger.info(
"producing tabix index for '%s': "
"tabix_index(%s, seq_col=%s, start_col=%s, end_col=%s, "
"line_skip=%s, force=True)",
filepath, filepath, seq_col, start_col, end_col, line_skip)
try:
tabix_index(filepath,
seq_col=seq_col,
start_col=start_col,
end_col=end_col,
line_skip=line_skip,
force=True)
except Exception: # pylint: disable=broad-except
logger.exception("failed to create tabix index for '%s'", filepath)
raise
def _add_tasks_tabixed(
args: dict[str, Any],
task_graph: TaskGraph,
output_path: str,
pipeline_config: RawPipelineConfig,
grr_definition: dict[str, Any],
ref_genome_id: str | None,
) -> None:
# output_path carries the final compression suffix (.gz/.bgz); annotate
# into the uncompressed working file, then compress to the final name.
# Without a suffix, working_path would equal output_path and the compress
# task would tabix_compress(out, out, force=True), truncating it in place.
assert is_compressed_filename(output_path), (
f"_add_tasks_tabixed: output_path must carry a compression suffix, "
f"got {output_path!r}")
working_path = strip_compression_suffix(output_path)
with closing(
TabixFile(
args["input"], index=tabix_index_filename(args["input"])),
) as pysam_file:
regions = produce_regions(pysam_file, args["region_size"])
file_paths = produce_partfile_paths(
args["input"], regions, args["work_dir"])
annotation_tasks = []
for region, path in zip(regions, file_paths, strict=True):
annotation_tasks.append(
task_graph.create_task(
f"part-{str(region).replace(':', '-')}",
_annotate_csv,
args=[
path,
pipeline_config,
grr_definition,
ref_genome_id,
region,
args,
],
deps=[],
intermediate_output_files=[path],
),
)
concat_task = task_graph.create_task(
"concat",
_concat,
args=[file_paths, working_path, args["keep_parts"]],
deps=annotation_tasks,
input_files=file_paths,
intermediate_output_files=[working_path],
)
compress_task = task_graph.create_task(
"tabix_compress",
_tabix_compress,
args=[working_path, output_path],
deps=[concat_task],
input_files=[working_path],
output_files=[output_path],
)
task_graph.create_task(
"tabix_index",
_tabix_index,
args=[output_path, args["columns_args"]],
deps=[compress_task],
input_files=[output_path],
output_files=[f"{output_path}.tbi"],
)
def _add_tasks_plaintext(
args: dict[str, Any],
task_graph: TaskGraph,
output_path: str,
pipeline_config: RawPipelineConfig,
grr_definition: dict[str, Any],
ref_genome_id: str | None,
) -> None:
if is_compressed_filename(output_path):
working_path = strip_compression_suffix(output_path)
annotate_task = task_graph.create_task(
"annotate_all",
_annotate_csv,
args=[
working_path,
pipeline_config,
grr_definition,
ref_genome_id,
None,
args,
],
deps=[],
intermediate_output_files=[working_path],
)
task_graph.create_task(
"tabix_compress",
_tabix_compress,
args=[working_path, output_path],
deps=[annotate_task],
output_files=[output_path],
)
else:
task_graph.create_task(
"annotate_all",
_annotate_csv,
args=[
output_path,
pipeline_config,
grr_definition,
ref_genome_id,
None,
args,
],
deps=[],
output_files=[output_path],
)
def _build_argument_parser() -> argparse.ArgumentParser:
"""Configure argument parser."""
parser = argparse.ArgumentParser(
description="Annotate columns",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
add_record_to_annotable_arguments(parser)
parser.add_argument(
"--input-separator", "--in-sep", default=None,
help="The column separator in the input; defaults to a tab, "
"or a comma when the input filename ends in .csv "
"(optionally .gz/.bgz compressed)")
parser.add_argument(
"--output-separator", "--out-sep", default=None,
help="The column separator in the output")
parser.add_argument(
"-n", "--dry-run",
help="Print the annotation/reannotation plan and exit without "
"writing any output.",
action="store_true",
default=False,
)
add_common_annotation_arguments(parser)
return parser
def _adjust_default_input_separator(args: dict[str, Any]) -> dict[str, Any]:
if args["input_separator"] is not None:
return args
suffixes = [s.lower() for s in Path(args["input"]).suffixes]
if suffixes and suffixes[-1] in (".gz", ".bgz"):
suffixes = suffixes[:-1]
if suffixes and suffixes[-1] == ".csv":
args["input_separator"] = ","
logger.info(
"input '%s' has a .csv extension; "
"defaulting --input-separator to comma", args["input"])
else:
args["input_separator"] = "\t"
return args
def _adjust_default_output_separator(args: dict[str, Any]) -> dict[str, Any]:
if args["output_separator"] is None:
args["output_separator"] = args["input_separator"]
return args
[docs]
def cli(argv: list[str] | None = None) -> None:
"""Entry point for running the tabular annotation tool."""
if not argv:
argv = sys.argv[1:]
arg_parser = _build_argument_parser()
args = vars(arg_parser.parse_args(argv))
if args.get("version"):
print(f"GAIn version: {__version__}")
sys.exit(0)
VerbosityConfiguration.set(args)
args = handle_default_args(args)
args = _adjust_default_input_separator(args)
args = _adjust_default_output_separator(args)
# Run inside work_dir so that intermediate files created by worker
# processes (e.g. htslib downloading a remote tabix .tbi index over
# http) land in work_dir instead of the launch directory. Workers
# spawned by process_graph inherit this working directory.
with chdir(args["work_dir"]):
context = build_cli_genomic_context(args)
pipeline = get_pipeline_from_context(context)
grr = get_grr_from_context(context)
assert grr.definition is not None
check_resource_locality(
pipeline,
lambda limit: _count_tabular_rows(args["input"], limit),
allow_remote=args["allow_remote_resources"],
)
ref_genome = context.get_reference_genome()
ref_genome_id = ref_genome.resource_id if ref_genome else None
cache_pipeline_resources(grr, pipeline)
if args.get("reannotate") or args.get("dry_run"):
emit_annotation_plan(args, pipeline, grr)
if args.get("dry_run"):
pipeline.close()
if ref_genome is not None:
ref_genome.close()
maybe_remove_work_dir(args, result=True)
return
args["columns_args"] = {
f"col_{col}": args[f"col_{col}"]
for cols in RECORD_TO_ANNOTATABLE_CONFIGURATION
for col in cols
}
output_path = args["output"]
region_size = args["region_size"]
task_graph = TaskGraph()
if tabix_index_filename(args["input"]) and region_size > 0:
_add_tasks_tabixed(
args,
task_graph,
output_path,
pipeline.raw,
grr.definition,
ref_genome_id,
)
else:
logger.info(
"input %s cannot be split into genomic regions; "
"forcing sequential execution (-j 1)",
args["input"])
args["jobs"] = 1
_add_tasks_plaintext(
args,
task_graph,
output_path,
pipeline.raw,
grr.definition,
ref_genome_id,
)
add_input_files_to_task_graph(args, task_graph)
result = TaskGraphCli.process_graph(task_graph, **args)
pipeline.close()
if ref_genome is not None:
ref_genome.close()
gc.collect()
maybe_remove_work_dir(args, result=result)
def _annotate_tabular_helper(
input_path: str,
pipeline: AnnotationPipeline,
output_path: str,
args: dict[str, Any], *,
reference_genome: ReferenceGenome | None = None,
region: Region | None = None,
attributes_to_delete: Sequence[str] | None = None,
) -> None:
"""Annotate a tabular file using a processing pipeline."""
attributes_to_delete = attributes_to_delete or []
filters: list[Filter] = []
source: Source
batch_size = cast(int, args.get("batch_size", 0))
if batch_size <= 0:
source = _CSVSource(
input_path,
reference_genome,
args["columns_args"],
args["input_separator"],
)
new_header = _build_new_header(
source.header, pipeline.get_attributes(), attributes_to_delete,
reannotation=_is_reannotation(pipeline))
filters.extend([
DeleteAttributesFromAWSFilter(attributes_to_delete),
AnnotationPipelineAnnotatablesFilter(pipeline),
_CSVWriter(output_path, args["output_separator"], new_header),
])
else:
source = _CSVBatchSource(
input_path,
reference_genome,
args["columns_args"],
args["input_separator"],
args["batch_size"],
)
new_header = _build_new_header(
source.header, pipeline.get_attributes(), attributes_to_delete,
reannotation=_is_reannotation(pipeline))
filters.extend([
DeleteAttributesFromAWSBatchFilter(attributes_to_delete),
AnnotationPipelineAnnotatablesBatchFilter(pipeline),
_CSVBatchWriter(output_path, args["output_separator"], new_header),
])
with PipelineProcessor(source, filters) as processor:
processor.process_region(region)
[docs]
def annotate_tabular(
input_path: str,
pipeline: AnnotationPipeline,
output_path: str,
args: dict[str, Any], *,
reference_genome: ReferenceGenome | None = None,
region: Region | None = None,
attributes_to_delete: Sequence[str] | None = None,
) -> None:
"""Annotate a tabular file using a processing pipeline."""
temp_output_path = output_path
if is_compressed_filename(output_path):
temp_output_path = strip_compression_suffix(output_path)
_annotate_tabular_helper(
input_path,
pipeline,
temp_output_path,
args,
reference_genome=reference_genome,
region=region,
attributes_to_delete=attributes_to_delete,
)
if is_compressed_filename(output_path):
# honor the explicit compression suffix (.gz/.bgz)
_tabix_compress(temp_output_path, output_path)
elif is_compressed_filename(input_path):
# uncompressed output name + compressed input: default to .gz
_tabix_compress(temp_output_path)