import logging
import re
import uuid
from typing import Any, cast
from dae.utils import fs_utils
[docs]
class FsspecHandler(logging.StreamHandler):
"""Class to create fsspec based logging handler."""
def __init__(self, logfile: str):
fs, logpath = fs_utils.url_to_fs(logfile)
stream = fs.open(logpath, "w")
super().__init__(stream=stream)
[docs]
def close(self) -> None:
"""Close the stream.
Copied from logging.FileHandler.close().
"""
self.acquire()
try:
try:
if self.stream:
try:
self.flush()
finally:
stream = self.stream
self.stream = None
stream.close()
finally:
# Issue #19523: call unconditionally to
# prevent a handler leak when delay is set
# Also see Issue #42378: we also rely on
# self._closed being set to True there
logging.StreamHandler.close(self)
finally:
self.release()
[docs]
def ensure_log_dir(**kwargs: Any) -> str:
"""Ensure logging directory exists."""
log_dir = kwargs.get("log_dir")
if log_dir is not None:
log_dir = fs_utils.abspath(log_dir)
if not fs_utils.exists(log_dir):
fs, path = fs_utils.url_to_fs(log_dir)
fs.mkdir(path, exists_ok=True)
return cast(str, log_dir)
RE_TASK_ID = re.compile(r"[\. /,()\-:;]")
[docs]
def safe_task_id(task_id: str) -> str:
result = RE_TASK_ID.sub("_", task_id)
if len(result) <= 200:
return result
result = result[:150]
return f"{result}_{uuid.uuid1()}"