Source code for lours.utils.parquet_saver

from collections.abc import Iterable
from pathlib import Path
from shutil import rmtree

import pandas as pd
from yaml import safe_dump, safe_load

from lours import __version__


[docs] def dict_to_parquet( output_dict: dict, output_dir: Path, version: str = __version__, fields: Iterable[str] | None = None, fields_to_str: Iterable[str] = ("images_root", "relative_path", "images_root"), overwrite: bool = False, ) -> None: """Save a dictionary containing dataframes as a yaml file and parquet files. The dataset can be nested. Args: output_dict: dictionary to save, containing yaml serializable objects and dataframes. Can be nested. output_dir: path to folder where to save the yaml and parquets files version: data version info for future compatibility. Defaults to current Lours version. fields: fields to save. Will ignore other fields in the output dictionary. If set to None, will save all fields. Defaults to None. fields_to_str: fields to convert to str. Useful for non-serializable objects like Path overwrite: if set to True, will remove the ``output_dir`` directory if it already exists. If set to False, will check that the directory either does not exist or is empty. Defaults to False Raises: OSError: Raised when the output directory is not empty and ``overwrite`` is set to False """ if overwrite: rmtree(output_dir, ignore_errors=True) output_dir.mkdir(parents=True) elif output_dir.is_dir(): if any(output_dir.iterdir()): raise OSError("Output directory must be empty") else: output_dir.mkdir(parents=True) def replace_dataframes_in_dict(dict_to_convert, prefix): converted_dict = {} for name, attribute in dict_to_convert.items(): if fields is not None and name not in fields: continue if isinstance(attribute, pd.DataFrame): parquet_name = f"{prefix}{name}.pq" output_parquet_file = output_dir / parquet_name converted_dict[name] = f"DataFrame:{parquet_name}" attribute.astype( {f: str for f in fields_to_str if f in attribute.columns} ).to_parquet(output_parquet_file) elif isinstance(attribute, dict): converted_dict[name] = replace_dataframes_in_dict( attribute, f"{prefix}{name}." ) else: converted_dict[name] = ( attribute if name not in fields_to_str else str(attribute) ) return converted_dict metadata = replace_dataframes_in_dict(output_dict, "") metadata["version"] = version with open(output_dir / "metadata.yaml", "w") as f: safe_dump(metadata, f)
[docs] def dict_from_parquet( input_dir: Path, fields_to_path: Iterable[str] = ("relative_path", "images_root"), ) -> dict: """Create dictionary from folder created with the function :func:`.dict_to_parquet` Args: input_dir: folder containing yaml and parquet files fields_to_path: Iterable of strings to specify which columns will need to be converted to Path objects. Returns: created dictionary. Will be used to reconstruct objects with dataframes, such as Dataset or Evaluator. """ def replace_dataframe_placeholders(input_dict): for name, attribute in input_dict.items(): if isinstance(attribute, str) and attribute.startswith("DataFrame:"): parquet_path = input_dir / attribute.split(":")[1] loaded_dataframe = pd.read_parquet(parquet_path) for f in fields_to_path: if f in loaded_dataframe.columns: loaded_dataframe[f] = loaded_dataframe[f].apply( Path # pyright: ignore ) input_dict[name] = loaded_dataframe elif isinstance(attribute, dict): replace_dataframe_placeholders(attribute) with open(input_dir / "metadata.yaml") as f: metadata = safe_load(f) for f in fields_to_path: if f in metadata: metadata[f] = Path(metadata[f]) replace_dataframe_placeholders(metadata) return metadata