Source code for lours.utils.annotations_appender

from collections import defaultdict
from collections.abc import Sequence
from typing import Any, TypeVar

import numpy as np
import pandas as pd
from numpy import ndarray

from ..dataset import Dataset
from . import BBOX_COLUMN_NAMES
from .bbox_converter import import_bbox
from .label_map_merger import merge_label_maps

*_, WIDTH_NAME, HEIGHT_NAME = BBOX_COLUMN_NAMES

D = TypeVar("D", bound=Dataset)
Scalar = float | str | bool
ScalarSequence = Sequence[float] | Sequence[str] | Sequence[bool]


[docs] def broadcast_annotations( image_id: int | Sequence[int] | ndarray, bbox_coordinates: ndarray | Sequence[float] | Sequence[Sequence[float]], category_id: int | ndarray | Sequence[int], confidence: float | Sequence[float] | ndarray | None = None, **other: ndarray | Scalar | ScalarSequence, ) -> dict[str, np.ndarray]: """Broadcast together detection annotation attributes. Every attribute except ``bbox_coordinates`` can be only 1 element, and will be duplicated to match the length of ``bbox_coordinates``. If its more than 1 element, the length must match the length of ``bbox_coordinates`` Args: image_id: id or 1D array of shapes [N] of ids of image corresponding to each annotation to be appended bbox_coordinates: 2D numpy array of shape [N, 4] or [N, 2] (for a keypoint) corresponding to the bounding box coordinates. Can also be of the shape [4] or [2] if there is only one bounding box/keypoint category_id: int value or 1D numpy array of shape [N] corresponding to the category of each bounding box confidence: when dealing with the special case of predictions, 1D numpy array of shape [N] corresponding to the confidence of the prediction. None otherwise. Defaults to None. **other: Other possible fields to the annotation Raises: ValueError: Will be raised if the bbox_coordinates shape is not [N, 4] [N, 2] or [2] or [4] ValueError: Will be raised if the values to broadcast are not scalar or array of size [1] or size [N] Returns: dictionary with the annotation attribute name as keys ("image_id", "category_id", etc) and the broadcast of values as numpy arrays as values. """ if not isinstance(bbox_coordinates, ndarray): bbox_coordinates = np.array(bbox_coordinates) bbox_shape = bbox_coordinates.shape if len(bbox_shape) == 1: bbox_coordinates = bbox_coordinates[None] if (len(bbox_coordinates.shape) != 2) or (bbox_coordinates.shape[1] not in [2, 4]): raise ValueError( "Error for bbox_coordinates, expected shape of form [4], [2], [N, 4]" f" or [N, 2] got {bbox_shape}" ) n_annot = bbox_coordinates.shape[0] def broadcast_scalar( value_to_broadcast: Scalar | ScalarSequence | np.ndarray, key_name: str, ) -> np.ndarray: if np.ndim(value_to_broadcast) == 0: broadcasted = np.tile(value_to_broadcast, n_annot) elif np.ndim(value_to_broadcast) == 1: value_to_broadcast_np = np.array(value_to_broadcast) if value_to_broadcast_np.shape[0] == 1: broadcasted = np.tile(value_to_broadcast_np, n_annot) elif value_to_broadcast_np.shape[0] == n_annot: broadcasted = value_to_broadcast_np else: broadcasted = None else: broadcasted = None if broadcasted is None: raise ValueError( f"Value to broadcast {key_name} can only be either a scalr of a 1D" f" dimensional array of length 1 or {n_annot}" ) return broadcasted result = {} result["image_id"] = broadcast_scalar(image_id, "image_id") result["bbox_coordinates"] = bbox_coordinates result["category_id"] = broadcast_scalar(category_id, "category_id") if confidence is not None: if n_annot == 1: result["confidence"] = broadcast_scalar(confidence, "confidence") else: confidence = np.array(confidence) if confidence.shape != (n_annot,): raise ValueError( "Confidence field can only be the same length as bounding boxes." f" Expected length of {n_annot}, got shape of {confidence.shape}" ) result["confidence"] = confidence for key, value in other.items(): result[key] = broadcast_scalar(value, key) return result
[docs] def add_detection_annotation( input_dataset: D, image_id: int | Sequence[int] | ndarray, bbox_coordinates: Sequence[float] | Sequence[Sequence[float]] | ndarray, format_string: str, category_id: ndarray | int | Sequence[int], inplace: bool = False, label_map: dict[int, str] | None = None, category_ids_mapping: dict[int, int] | None = None, confidence: float | ndarray | Sequence[float] | None = None, **other_columns: Scalar | ScalarSequence | ndarray, ) -> D: """Add one or multiple detection annotations to the current dataset. In the case of a single annotation, every option can be a single value, but in the case of multiple annotations, every option needs to be an array of such values, and every array needs to be the same length. Note: In additions to the following options, you can add other fields as well, with keyword arguments. Args: input_dataset: Dataset to which we want to add new annotations image_id: image identifier to link each detection to the corresponding image bbox_coordinates: list of coordinates for the bounding box. Can follow any compatible format, as long as it is given in the next format format_string: format of coordinates, whether coordinates are relatives, using corner points of the box, box dimensions, etc. See :func:`.import_bbox` for more info category_id: category of each detection. Label will be deduced from dataset's label map inplace: if set to True, will modify the dataset in place and return itself. Else, will return a modified Dataset. Defaults to False. label_map: In the case the current dataset's label map is incomplete, merge it with this new label map. current label map and new label map must be compatible, see :func:`.merge_label_maps`. Defaults to None. category_ids_mapping: Optional dictionary to map annotated category ids into the right ids. This is useful for example when a neural network can only use a contiguous label map. confidence: Optional field for confidence, in the case annotations are actually predictions. Must the same length as bbox_coordinates. In the case of a single prediction, can also be a float. Defaults to None. **other_columns: Other column representing custom fields. Raises: ValueError: raised when giving numpy arrays are not the same number of elements, or if the bounding box coordinates is not of the shape either [4], or [N, 4] Returns: If ``inplace`` is False, new dataset object with appended annotations. Otherwise, the updated ``input_dataset``. """ if label_map is not None: label_map = merge_label_maps(input_dataset.label_map, label_map, method="outer") else: label_map = input_dataset.label_map columns = broadcast_annotations( image_id=image_id, bbox_coordinates=bbox_coordinates, category_id=category_id, confidence=confidence, **other_columns, ) bbox_coordinates = columns.pop("bbox_coordinates") for name, column in columns.items(): if column.ndim != 1: raise ValueError( "Field as arrays can only be 1-dimensional, but got a shape of" f" {column.shape} for column {name}" ) if bbox_coordinates.shape[1] not in [2, 4]: raise ValueError( "Error for bbox_coordinates, expected an array of 4 or 2 columns, got" f" shape of {bbox_coordinates.shape}" ) annotations_df = pd.DataFrame(columns) if input_dataset.len_annot() > 0: first_index = input_dataset.annotations.index.max() + 1 annotations_df.index += first_index if category_ids_mapping is not None: annotations_df["category_id"] = annotations_df["category_id"].replace( category_ids_mapping ) bbox_df = import_bbox( bounding_boxes=bbox_coordinates, images_df=input_dataset.images, image_ids=annotations_df["image_id"], input_format=format_string, ) bbox_df.index = annotations_df.index annotations_df = pd.concat([annotations_df, bbox_df], axis=1) if ("area" in input_dataset.annotations.columns) and ( "area" not in annotations_df.columns ): annotations_df["area"] = bbox_df[WIDTH_NAME] * bbox_df[HEIGHT_NAME] annotations_df = pd.concat([input_dataset.annotations, annotations_df]) if inplace: input_dataset.label_map = label_map input_dataset.annotations = annotations_df input_dataset.init_annotations() return input_dataset else: return input_dataset.from_template( annotations=annotations_df, label_map=label_map )
[docs] class AnnotationAppender: """Context manager to easily add detection tensors, as if the Dataset was a list after the appending is finished, the appender construct big numpy arrays to concatenate to the dataset's annotations dataframe """ def __init__( self, dataset: Dataset, format_string: str = "XYWH", category_ids_mapping: dict[int, int] | None = None, label_map: dict[int, str] | None = None, ): """Main constructor. Args: dataset: dataset the annotations will be appended to. format_string: String describing bounding box format. Defaults to "XYWH". category_ids_mapping: Optional dictionary to map annotated category ids into the right ids. This is useful for example when a neural network can only use a contiguous label map. label_map: Optional dictionary to provide ``category_id -> ``category_str`` mapping for additional categories in appended annotations. Defaults to None. """ self.dataset = dataset self.image_ids = set(dataset.images.index) self.format_string = format_string self.label_map = label_map self.categroy_ids_mapping = category_ids_mapping self.reset()
[docs] def reset(self): """Creates an empty dictionary that will be fed new annotations.""" self.annotations_to_append = defaultdict(dict) self.i = 0 self.index = {}
def __enter__(self): """Function called at the beginning of ``with`` context. Returns: self class, with the ``append``method. """ self.reset() return self
[docs] def append( self, image_id: int | Sequence[int] | ndarray, bbox_coordinates: ndarray | Sequence[float], category_id: int | Sequence[int] | ndarray, confidence: ndarray | float | Sequence[float] | None = None, **other: Scalar | ScalarSequence | ndarray, ) -> None: """Append annotations for a particular image id. Everything except image id is expected to be a numpy array. Note that in addition to the regular bounding boxes coordinates, category and confidence, you can add other fields as long as they are numpy array of the same length. If no column exist in the dataset's annotations dataframe, it will be created (setting the already existing annotations to NaN for this column) Args: image_id: id or 1D array of shapes [N] of ids of image corresponding to each annotation to be appended bbox_coordinates: 2D numpy array of shape [N, 4] corresponding to the bounding box coordinates category_id: hashable value or 1D numpy array of shape [N] corresponding to the category of each bounding box confidence: when dealing with the special case of predictions, 1D numpy array of shape [N] corresponding to the confidence of the prediction, which can also be a float if N == 1. None otherwise. Defaults to None. **other: Other possible fields to the annotation Raises: ValueError: raised when the given numpy arrays are not of the same length, or if bounding box coordinates are not a 2D array with 4 columns. ValueError: raised when the given image id is not present in the dataset's image dataframe """ ids_to_check = [image_id] if isinstance(image_id, int) else image_id for i in ids_to_check: if i not in self.image_ids: raise ValueError(f"Image id {i} is not in dataset's images dataframe") broadcasted_annotations = broadcast_annotations( image_id=image_id, bbox_coordinates=bbox_coordinates, category_id=category_id, confidence=confidence, **other, ) n_annot = len(broadcasted_annotations["image_id"]) # Note that we need index because not all append calls have the same keywords # As such, the index can help up know which rows should have actual value, # and which rows should have pd.NA self.index[self.i] = np.arange(self.i, self.i + n_annot) for key, value in broadcasted_annotations.items(): self.annotations_to_append[key][self.i] = value self.i += n_annot
[docs] def finish(self) -> None: """Concatenate all annotations given by ``append`` method into one dataframe, with the right bounding box format and concatenate it to the Dataset's annotation original DataFrame. """ concatenated_annotations = [] try: bbox = np.concatenate( list(self.annotations_to_append.pop("bbox_coordinates").values()) ) except KeyError: # No bbox coordinates were added return for name, annot in self.annotations_to_append.items(): index = np.concatenate([self.index[k] for k in annot.keys()]) array = np.concatenate(list(annot.values())) concatenated_annotations.append(pd.Series(array, index=index, name=name)) annotations_to_append = pd.concat(concatenated_annotations, axis=1) annotations_to_append_dict = { str(k): annotations_to_append[k].to_numpy() for k in annotations_to_append.columns } add_detection_annotation( self.dataset, format_string=self.format_string, inplace=True, label_map=self.label_map, category_ids_mapping=self.categroy_ids_mapping, bbox_coordinates=bbox, **annotations_to_append_dict, )
def __exit__(self, exit_type: Any, exit_value: Any, traceback: Any) -> None: """Function called at the en of context, when annotations have been appended. Args: exit_type: For context manager compatibility. Not used here. exit_value: For context manager compatibility. Not used here. traceback: For context manager compatibility. Not used here. """ self.finish()