import logging
from typing import cast
from urllib.parse import urlparse
import pyarrow as pa
import pyarrow.parquet as pq
from fsspec.core import url_to_fs
from fsspec.spec import AbstractFileSystem
logger = logging.getLogger(__name__)
[docs]
def url_to_pyarrow_fs(
filename: str,
filesystem: AbstractFileSystem | None = None) -> pa.fs.FileSystem:
"""Turn URL into pyarrow filesystem instance.
Parameters
----------
filename : str
The fsspec-compatible URL
filesystem : fsspec.FileSystem
An fsspec filesystem for ``filename``.
Returns
-------
filesystem : pyarrow.fs.FileSystem
The new filesystem discovered from ``filename`` and ``filesystem``.
"""
if isinstance(filesystem, pa.fs.FileSystem):
return filesystem, filename
if filesystem is None:
if urlparse(filename).scheme:
fsspec_fs, path = url_to_fs(filename)
pa_fs = pa.fs.PyFileSystem(pa.fs.FSSpecHandler(fsspec_fs))
return pa_fs, path
return None, filename
pa_fs = pa.fs.PyFileSystem(pa.fs.FSSpecHandler(filesystem))
return pa_fs, filename
[docs]
def merge_parquets(
in_files: list[str], out_file: str,
*,
delete_in_files: bool = True,
row_group_size: int = 50_000,
parquet_version: str | None = None,
) -> None:
"""Merge `in_files` into one large file called `out_file`."""
try:
_try_merge_parquets(
in_files, out_file,
delete_in_files=delete_in_files,
row_group_size=row_group_size,
parqet_version=parquet_version)
except Exception: # pylint: disable=broad-except
# unsuccessfull conversion. Remove the partially generated file.
fs, path = url_to_fs(out_file)
if fs.exists(path):
fs.rm_file(path)
raise
def _merge_flush_batches(
batches: list[pa.RecordBatch],
out_parquet: pq.ParquetWriter,
) -> None:
if len(batches) == 0:
return
table = pa.Table.from_batches(batches)
out_parquet.write_table(table)
def _collect_in_files_compression(
in_files: list[str],
) -> str | dict[str, str]:
compression: str | dict[str, str] = "SNAPPY"
if len(in_files) > 0:
for in_file in in_files:
fs, path = url_to_fs(in_file)
if not fs.exists(path):
continue
parq_file = pq.ParquetFile(path)
if parq_file.metadata.num_row_groups == 0:
continue
compression = {}
row_group = parq_file.metadata.row_group(0)
schema = parq_file.schema_arrow
for name in schema.names:
for index in range(row_group.num_columns):
column = row_group.column(index)
if column.path_in_schema != name:
continue
column_compression = column.compression
if column_compression.upper() == "UNCOMPRESSED":
continue
compression[name] = column_compression
return compression
def _try_merge_parquets(
in_files: list[str], out_file: str,
*,
delete_in_files: bool,
row_group_size: int = 50_000,
parqet_version: str | None = None,
) -> None:
assert len(in_files) > 0
out_parquet = None
compression = _collect_in_files_compression(in_files)
batches = []
batches_row_count = 0
for in_file in in_files:
in_fs, in_fn = url_to_pyarrow_fs(in_file)
if in_fs is None:
in_fs = pa.fs.LocalFileSystem()
try:
with in_fs.open_input_file(in_fn) as parquet_native_file:
parq_file = pq.ParquetFile(parquet_native_file)
if out_parquet is None:
out_filesystem, out_filename = url_to_pyarrow_fs(out_file)
out_parquet = pq.ParquetWriter(
out_filename, parq_file.schema_arrow,
filesystem=out_filesystem,
compression=compression,
version=parqet_version,
)
for batch in parq_file.iter_batches():
batches.append(batch)
batches_row_count += batch.num_rows
if batches_row_count >= row_group_size:
_merge_flush_batches(batches, out_parquet)
batches = []
batches_row_count = 0
except pa.ArrowInvalid:
logger.exception(
"invalid chunk parquet file: %s", in_file)
raise
except FileNotFoundError:
logger.warning(
"missing chunk parquet file: %s", in_file)
_merge_flush_batches(batches, out_parquet)
batches = []
batches_row_count = 0
cast(pq.ParquetWriter, out_parquet).close()
if delete_in_files:
for in_file in in_files:
try:
fs, path = url_to_fs(in_file)
fs.rm_file(path)
except FileNotFoundError: # noqa: PERF203
logger.warning("missing chunk parquet file: %s", in_file)