Source code for lours.dataset.io.caipy

import json
import shutil
from collections.abc import Iterable
from os.path import relpath
from pathlib import Path
from warnings import warn

import numpy as np
import pandas as pd
from imageio.v3 import imread, imwrite
from jsonschema_rs import validator_for
from tqdm.auto import tqdm

from lours.dataset import Dataset
from lours.utils import BBOX_COLUMN_NAMES
from lours.utils.testing import assert_columns_properly_normalized, assert_images_valid

from .common import construct_label_map
from .schema_util import (
    fill_with_dtypes_and_default_value,
    flatten_schema,
    get_enums,
    get_remapping_dict_from_names,
    get_remapping_dict_from_schema,
    load_json_schema,
    remap_dict,
)


[docs] def load_caipy_annot_folder( folder_path: Path, schema: dict | None = None ) -> tuple[pd.DataFrame | None, pd.DataFrame | None]: """Glob all json in folder path and construct image and annotation dataframe Args: folder_path: folder where we will search for json files schema: Optional JSON schema dict used to check the conformity of loaded JSON files. Returns: A pair of dataframes, representing image and annotations data, most likely used to construct the dataset object. """ annotations = [] images = [] annotations_files = list(folder_path.glob("**/*.json")) if schema is not None: validator = validator_for(schema) else: validator = None for annot_file in tqdm(annotations_files): with open(annot_file) as f: frame_data = json.load(f) if validator is not None: validator.validate(frame_data) if "type" in frame_data.keys(): assert ( frame_data["type"] == "instances" ), "Only instance type supported for now" image_data = frame_data["image"] image_file_name = Path(image_data.pop("file_name")) assert annot_file.stem == image_file_name.stem, ( "bad formatting, annotation file and image file_name should only differ by" f" the extension, got {annot_file.name} and" f" {image_file_name} instead" ) image_path = annot_file.parent.relative_to(folder_path) / image_file_name image_data["relative_path"] = image_path image_data["type"] = image_path.suffix images.append(image_data) for annot in frame_data["annotations"]: annot["image_id"] = image_data["id"] annotations.extend(frame_data["annotations"]) if images: # Sometimes, one key is either a sub dictionary or nan. # As a consequence, the column "key" will be full of nans while columns # "key.subkey" exist. remove columns full of NaN to avoid this situation. images = pd.json_normalize(images).set_index("id").dropna(axis=1, how="all") set_to_none_columns = images.select_dtypes("object").columns images[set_to_none_columns] = images[set_to_none_columns].replace(np.nan, None) assert_columns_properly_normalized(images) else: images = None if annotations: annotations = ( pd.json_normalize(annotations).set_index("id").dropna(axis=1, how="all") ) set_to_none_columns = annotations.select_dtypes("object").columns annotations[set_to_none_columns] = annotations[set_to_none_columns].replace( np.nan, None ) # BBoxes are currently lists of 4 elements in a "bbox" column. # Convert each list element in the corresponding column for the dataset object bboxes = pd.DataFrame(list(annotations["bbox"]), index=annotations.index) annotations = annotations.drop("bbox", axis=1) for i, name in enumerate(BBOX_COLUMN_NAMES): annotations[name] = bboxes[i] assert_columns_properly_normalized(annotations) else: annotations = None return images, annotations
[docs] def load_caipy_split( images_folder: Path, annotations_folder: Path, dataset_name: str | None = None, split_name: str | None = None, schema: dict | None = None, ) -> Dataset: """Load a particular caipy split folder and convert it to a lours Dataset Args: images_folder: folder where images are stored annotations_folder: folder where annotations are stored as json files dataset_name: If specified, will be the dataset name, used when showing the dataset or exporting in other formats such as fiftyone. If not specified, the dataset name will be the name of the root folder. split_name: name of the split to give to ``split`` column of images DataFrame. Defaults to ``None``. schema: JSON schema dict used to check the conformity of loaded JSON files. If set to ``None``, will not check the conformity. Defaults to ``None``. Raises: ValueError: If image ids are not mutually exclusives Returns: Dataset containing only one split from caipy, expected to be merged with other caipy splits """ images, annotations = load_caipy_annot_folder(annotations_folder, schema) if images is not None: if not images.index.is_unique: raise ValueError("two images have the same id") if split_name is not None: images["split"] = split_name label_map = ( construct_label_map(annotations=annotations) if annotations is not None else None ) return Dataset( images_root=images_folder, images=images, annotations=annotations, label_map=label_map, dataset_name=dataset_name, )
[docs] def from_caipy( dataset_path: Path | str, dataset_name: str | None = None, split: str | None = None, splits_to_read: str | Iterable[str] | None = None, use_schema: bool = False, json_schema: dict | str | Path | None = "default", booleanize: bool = True, ) -> Dataset: """Load a dataset stored in the cAIpy format See `specifications`__ .. __: UPDATE-ME This will error if - two annotations have the same ``category_id`` but not the same ``category_str`` - two annotations have a different ``category_id`` but the same ``category_str`` - two images have the same ``file_name``, but not the same ``id`` Args: dataset_path: folder root of dataset. Should contain the folders "Images" and "Annotations". dataset_name: If specified, will be the dataset name, used when showing the dataset or exporting in other formats such as fiftyone. If not specified, the dataset name will be the name of the root folder. split: if data is at the root of Images and Annotations folder, the split will be set to this option. Defaults to ``None`` splits_to_read: if given, will only read the specified splits. Useful for a faster loading. use_schema: If set to True, and ``json_schema`` is not None, will use schema for validation and formatting (see option ``json_schema``) json_schema: schema dictionary or Path to a schema that json files will be tested against for compliance. If its not a dictionary, it can be either a url, or a local path. If set to None, or ``use_schema`` is set to False, will not perform any test. Defaults to default schema. booleanize: In the case some attributes are array of enum with unique elements, they will be booleanized (see :func:`~lours.dataset.Dataset.booleanize`). Note that this option is only used if `json_schema`` is not None and ``use_schema`` is set to True. Defaults to True. Raises: ValueError: Inconsistency between two annotations or images (see description above) Returns: Loaded dataset object See Also: - :func:`from_caipy_generic` - :ref:`Tutorial on schemas </notebooks/6_demo_schemas.ipynb>` - :ref:`Tutorial on booleanization </notebooks/7_demo_booleanize.ipynb>` - `cAIpy specifications <UPDATE-ME>`_ """ # noqa: E501 dataset_path = Path(dataset_path) annotations_folder = dataset_path / "Annotations" images_folder = dataset_path / "Images" dataset_name = dataset_name if dataset_name is not None else dataset_path.name return from_caipy_generic( images_folder, annotations_folder, dataset_name, split, splits_to_read, use_schema, json_schema, booleanize, )
[docs] def from_caipy_generic( images_folder: Path | str | None, annotations_folder: Path | str, dataset_name: str | None = None, split: str | None = None, splits_to_read: str | Iterable[str] | None = None, use_schema: bool = False, json_schema: dict | str | Path | None = "default", booleanize: bool = True, ) -> Dataset: """Load a dataset stored in the cAIpy format, but you can specify images and annotations folders rather than giving a folder with Images and Annotations sub-folders. This gives much more flexibility, especially when working predictions and annotations variations. See `specifications`__ .. __: UPDATE-ME this will error if - two annotations have the same ``category_id`` but not the same ``category_str`` - two annotations have a different ``category_id`` but the same ``category_str`` - two images have the same ``file_name``, but not the same ``id`` Args: images_folder: folder root of images. annotations_folder: folder root of annotations. dataset_name: If specified, will be the dataset name, used when showing the dataset or exporting in other formats such as fiftyone. split: if data is at the root of Images and Annotations folder, the split will be set to this option. Defaults to ``None`` splits_to_read: if given, will only read the specified splits. Useful for a faster loading. use_schema: If set to True, and ``json_schema`` is not None, will use schema for validation and formatting (see option ``json_schema``) json_schema: schema dictionary or Path to a schema that json files will be tested against for compliance. If its not a dictionary, it can be either a url or a local path. If set to None, or ``use_schema`` is set to False, will not perform any test. Defaults to default schema. booleanize: In the case some attributes are array of enum with unique elements, they will be booleanized (see :func:`~lours.dataset.Dataset.booleanize`). Note that this option is only used if `json_schema`` is not None and ``use_schema`` is set to True. Defaults to True. Raises: ValueError: Inconsistency between two annotations or images (see description above) Returns: Loaded dataset object See Also: - :func:`from_caipy` - :ref:`Tutorial on schemas </notebooks/6_demo_schemas.ipynb>` - :ref:`Tutorial on booleanization </notebooks/7_demo_booleanize.ipynb>` - `cAIpy specifications <UPDATE-ME>`_ """ # noqa: E501 if use_schema and json_schema is not None: if isinstance(json_schema, dict): schema = json_schema else: schema = load_json_schema(json_schema) else: schema = None annotations_folder = Path(annotations_folder) if images_folder is not None: images_folder = Path(images_folder) else: images_folder = annotations_folder.parent / "Images" print(f"specifying a fictive path for images : {images_folder}") if isinstance(splits_to_read, str): splits_to_read = [splits_to_read] if splits_to_read is None: selected_splits = ["train", "valid", "eval"] else: selected_splits = splits_to_read dataset = Dataset(images_root=images_folder, dataset_name=dataset_name) for split_to_read in selected_splits: split_folder = annotations_folder / split_to_read if not split_folder.is_dir(): continue split_dataset = load_caipy_split( images_folder=images_folder / split_to_read, annotations_folder=split_folder, dataset_name=dataset_name, split_name=split_to_read, schema=schema, ) if len(split_dataset) == 0: continue dataset += split_dataset if len(dataset) == 0 and splits_to_read is None: dataset = load_caipy_split( images_folder=images_folder, annotations_folder=annotations_folder, dataset_name=dataset_name, split_name=split, schema=schema, ) if schema is not None: image_schema = schema["properties"]["image"] annotation_schema = schema["properties"]["annotations"]["items"] if booleanize: image_enums = get_enums(image_schema) annotation_enums = get_enums(annotation_schema) dataset = dataset.booleanize( missing_ok=True, **(annotation_enums | image_enums) ) dataset.images = fill_with_dtypes_and_default_value( image_schema, dataset.images ) dataset.annotations = fill_with_dtypes_and_default_value( annotation_schema, dataset.annotations ) return dataset
[docs] def split_to_caipy( dataset: Dataset, split_images_folder: Path | None, split_annotations_folder: Path, schema: dict | None = None, copy_images: bool = True, to_jpg: bool = True, overwrite_images: bool = True, overwrite_labels: bool = True, flatten_paths: bool = True, ) -> None: """Save a particular split to cAIpy. images and annotations folder must be given, as it can be the root of "Images" and "Annotations", or a subfolder based on split name, e.g. "Images/train" Note: Unless specified otherwise, relative paths of images a flattened during the export, which modifies the dataset if the images and annotations were stored in subfolders, but will put all images and annotations in their respective root folder. Note: If schema is not given, the nested dictionary will be deduced from column names with the separator "." Args: dataset: dataset object to save. Normally, should be a unique split split_images_folder: dataset where to save images, either as links or files. If None, will not save images. This is useful when you just want to save predictions or a variation of annotations. split_annotations_folder: dataset where to save caipyjson files. schema: JSON schema dict used to check the conformity of output JSON files. It will also be used to remove columns for fields no included in the schema. If set to ``None``, will not check the conformity. Defaults to ``None``. copy_images: If set to False, will create a symbolic link instead of copying. Much faster, but needs to keep original images in the same relative path. Defaults to False. to_jpg: if True, will convert images to jpg if needed. Defaults to True. overwrite_images: if set to False, will skip images that are already copied. Defaults to True. overwrite_labels: if set to False, will skip annotation that are already created. Defaults to True. flatten_paths: if set to True, will put all files in the root Annotations and Images folders by replacing folder separation ("/") with "_" in relative path. Defaults to True """ if schema is not None: validator = validator_for(schema) else: validator = None # Get back to the list of 4 elements format converted_bboxes = pd.Series( dataset.annotations[BBOX_COLUMN_NAMES].to_numpy().tolist(), index=dataset.annotations.index, name="bbox", ) # Remove useless columns and append the new one converted_annotations = dataset.annotations.drop(BBOX_COLUMN_NAMES, axis=1) converted_annotations = pd.concat([converted_annotations, converted_bboxes], axis=1) converted_annotations = converted_annotations.reset_index() # Get back the relative_path to a simple string n_images = len(dataset.images) if schema is None: image_remapping_dict = get_remapping_dict_from_names( frozenset( [*dataset.get_image_attributes(), "file_name", "id", "width", "height"] ) ) annotations_remapping_dict = get_remapping_dict_from_names( frozenset( [ *dataset.get_annotations_attributes(), "id", "bbox", "category_id", "category_str", ] ) ) else: image_schema = schema["properties"]["image"] annotations_schema = schema["properties"]["annotations"]["items"] image_remapping_dict = get_remapping_dict_from_schema(image_schema) saved_image_keys = flatten_schema(image_schema) lost_image_columns = set(dataset.get_image_attributes()) - set(saved_image_keys) if lost_image_columns: warn( "These column in self.images will be lost because they don't follow" f" the specified json schema: {', '.join(lost_image_columns)}", RuntimeWarning, ) annotations_remapping_dict = get_remapping_dict_from_schema(annotations_schema) saved_annot_keys = flatten_schema(annotations_schema) lost_annot_columns = set(dataset.get_annotations_attributes()) - set( saved_annot_keys ) if lost_annot_columns: warn( "These column in self.annotations will be lost because they don't" " follow the specified json schema:" f" {', '.join(lost_annot_columns)}", RuntimeWarning, ) for image_id, image_data in tqdm(dataset.images.iterrows(), total=n_images): assert isinstance(image_id, int) instances = converted_annotations[converted_annotations["image_id"] == image_id] input_image_path = (dataset.images_root / image_data["relative_path"]).resolve() # Handle the case of images coming from a cAIpy, which already have the # structure {split}/filename in their relative path, which we don't want output_relative_path = image_data["relative_path"] if output_relative_path.parts[0] in ["train", "valid", "eval"]: output_relative_path = Path(*output_relative_path.parts[1:]) if flatten_paths: output_relative_path = Path("_".join(output_relative_path.parts)) output_filename = output_relative_path.name if split_images_folder is not None: output_image_path = (split_images_folder / output_relative_path).resolve() output_image_path.parent.mkdir(parents=True, exist_ok=True) if to_jpg: output_image_path = output_image_path.with_suffix(".jpg") output_filename = output_image_path.name # Copy image to destination. Do nothing if the image already exists, if overwrite_images or not output_image_path.is_file(): if to_jpg and image_data["type"].lower() not in [".jpg", ".jpeg"]: image = imread(input_image_path) imwrite(output_image_path, image[..., :3]) elif copy_images: shutil.copy(input_image_path, output_image_path) else: output_image_path.unlink(missing_ok=True) output_image_path.symlink_to( relpath(input_image_path, output_image_path.parent) ) output_json_path = split_annotations_folder / output_relative_path.with_suffix( ".json" ) output_json_path.parent.mkdir(parents=True, exist_ok=True) if not output_json_path.is_file() or overwrite_labels: # Get only annotations associated with this image image_dict = { "file_name": output_filename, "id": int(image_id), "width": int(image_data["width"]), "height": int(image_data["height"]), **image_data[dataset.get_image_attributes()], } image_dict = remap_dict(image_dict, image_remapping_dict) annot_list = instances.to_dict("records") annot_list = list( map(lambda x: remap_dict(x, annotations_remapping_dict), annot_list) ) annotations_dict = { "image": image_dict, "type": "instances", "annotations": annot_list, } if validator is not None: validator.validate(annotations_dict) with open(output_json_path, "w") as f: json.dump(annotations_dict, f, indent=2)
[docs] def dataset_to_caipy( dataset: Dataset, output_path: Path | str, use_schema: bool = False, json_schema: str | Path | None = "default", copy_images: bool = True, to_jpg: bool = True, overwrite_images: bool = True, overwrite_labels: bool = True, flatten_paths: bool = True, ) -> None: """Save dataset to cAIpy format Note that depending on the splits present in your dataset, the folder structure might change Note: Unless specified otherwise, relative paths of images a flattened during the export, which modifies the dataset if the images and annotations were stored in subfolders, but will put all images and annotations of a particular split in their respective root folder. Note: If schema is not given, the nested dictionary will be deduced from column names with the separator "." Args: dataset: dataset to save output_path: root folder where the dataset folder structure will be created. use_schema: If set to True, and ``json_schema`` is not None, will use schema for validation and formatting (see option ``json_schema``) json_schema: Path to a schema that output json dicts will be tested against for compliance. They will also be used to remove columns for fields no included in the schema. Can be either a url or a local path. If set to None, or ``use_schema`` is set to False, will not perform any test or reformatting. Defaults to default schema. copy_images: If set to False, will create a symbolic link instead of copying. Much faster, but needs to keep original images in the same relative path. Defaults to False. to_jpg: if True, will convert images to jpg if needed. Defaults to True. overwrite_images: if set to False, will skip images that are already copied. Defaults to True. overwrite_labels: if set to False, will skip annotation that are already created. Defaults to True. flatten_paths: if set to True, will put all files in the root Annotations and Images folders by replacing folder separation ("/") with "_" in relative path. Defaults to True """ output_path = Path(output_path) images_folder = output_path / "Images" annotations_folder = output_path / "Annotations" return dataset_to_caipy_generic( dataset, images_folder, annotations_folder, use_schema, json_schema, copy_images, to_jpg, overwrite_images, overwrite_labels, flatten_paths, )
[docs] def dataset_to_caipy_generic( dataset: Dataset, output_images_folder: Path | str | None, output_annotations_folder: Path | str, use_schema: bool = False, json_schema: str | Path | None = "default", copy_images: bool = True, to_jpg: bool = True, overwrite_images: bool = True, overwrite_labels: bool = True, flatten_paths: bool = True, ) -> None: """Save dataset to cAIpy format Note that depending on the splits present in your dataset, the folder structure might change Notes: - Unless specified otherwise, relative paths of images a flattened during the export, which modifies the dataset if the images and annotations were stored in subfolders, but will put all images and annotations of a particular split in their respective root folder. - If schema is not given, the nested dictionary will be deduced from column names with the separator "." Args: dataset: dataset to save output_images_folder: root folder where the images will be saved. If None, will not save images. Useful when only saving predictions or a variations of annotations. output_annotations_folder: root folder where the json file will be saved. use_schema: If set to True, and ``json_schema`` is not None, will use schema for validation and formatting (see option ``json_schema``) json_schema: Path to a schema that output json dicts will be tested against for compliance. They will also be used to remove columns for fields no included in the schema. Can be either a url or a localt path. If set to None, or ``use_schema`` is set to False, will not perform any test. Defaults to the example schema. copy_images: If set to False, will create a symbolic link instead of copying. Much faster, but needs to keep original images in the same relative path. Defaults to False. to_jpg: if True, will convert images to jpg if needed. Defaults to True. overwrite_images: if set to False, will skip images that are already copied. Defaults to True. overwrite_labels: if set to False, will skip annotation that are already created. Defaults to True. flatten_paths: if set to True, will put all files in the root Annotations and Images folders by replacing folder separation ("/") with "_" in relative path. Defaults to True """ if output_images_folder is not None: try: assert_images_valid(dataset, load_images=False) except AssertionError as e: raise ValueError( "Dataset images are missing, check that the images root folder is the" " right one" ) from e if use_schema and json_schema is not None: schema = load_json_schema(json_schema) else: schema = None output_dataset = dataset.debooleanize() if "split" in dataset.images.columns: splits = dataset.images["split"].unique().tolist() else: splits = [] output_annotations_folder = Path(output_annotations_folder) if output_images_folder is not None: output_images_folder = Path(output_images_folder) output_images_folder.mkdir(exist_ok=True, parents=True) output_annotations_folder.mkdir(exist_ok=True, parents=True) # If no split, or only split evaluate to false (empty string or null value) # save the dataset without split subfolders if not splits or (len(splits) == 1 and pd.isna(splits[0])): print("Saving cAIpy dataset without split") split_to_caipy( output_dataset, output_images_folder, output_annotations_folder, schema, copy_images, to_jpg, overwrite_images, overwrite_labels, flatten_paths, ) else: if any(pd.isna(split) for split in splits): raise AssertionError( "Dataset cannot have both data with known split and with unknown split" ) for split_name in splits: print(f"Saving {split_name} split...") if output_images_folder is not None: split_images_folder = output_images_folder / f"{split_name}" split_images_folder.mkdir(exist_ok=True) else: split_images_folder = None split_annotations_folder = output_annotations_folder / f"{split_name}" split_annotations_folder.mkdir(exist_ok=True) split_to_caipy( output_dataset.get_split(split_name), split_images_folder, split_annotations_folder, schema, copy_images, to_jpg, overwrite_images, overwrite_labels, flatten_paths, )