Source code for iaa_od.metrics.alpha

from iaa_od.models import GroundTruthProtocol, KAlphaUnit, Result, KAlphaObservationsMatrix
from iaa_od.models.constants import MIN_ALPHA, MAX_ALPHA, STD_IOU_THR, MIN_IOU_THR, MAX_IOU_THR, INFINITY, EXCLUDED_CATEGORIES
from .iou import iou, iom
import numpy as np
import sklearn as sk
from typing import Optional
from copy import deepcopy

[docs] def alpha(gts: list[GroundTruthProtocol], /, *, iou_threshold: float = STD_IOU_THR, use_iom: bool = False, precomputed_result: Optional[Result] = None, collapsed_categories: Optional[dict[str, list[int]]] = None) -> Result: """ This function computes Krippendorff's alpha for a set of ground truths containing bounding box annotations. Parameters: gts (list[GroundTruth]): A list of GroundTruth objects containing annotations. iou_threshold (float): The IoU threshold to consider two annotations as overlapping. use_iom (bool): Whether to use IoM matching when finding units. Defaults to false. precomputed_result (Result | None): An optional precomputed Result object, used to merge results from multiple algorithms. collapsed_categories (dict[str, list[int]] | None): An optional dictionary to collapse categories into macro-categories, used for Scale Complexity computation. Returns: Result: A Result object containing the computed alpha value and other relevant information. """ # Check correctness of IoU threshold if not (MIN_IOU_THR <= iou_threshold <= MAX_IOU_THR): raise ValueError(f"iou_threshold must be between {MIN_IOU_THR} and {MAX_IOU_THR}") # Check that there are at least two ground truths to compare if len(gts) < 2: raise ValueError("At least two ground truths are required to compute alpha") # Initialise result result: Result if not precomputed_result: result = Result( gts=gts, iou_thr=iou_threshold, iom=use_iom ) else: result = precomputed_result # Find units in the ground truths units: list[KAlphaUnit] = _find_units(gts, iou_threshold, use_iom=use_iom) result.units = units # If no units were found, return alpha as NaN if len(units) == 0: result.alpha = float('nan') return result # Create the observations matrix unfiltered_observations: KAlphaObservationsMatrix = _build_observations_matrix(gts, units, collapsed_categories=collapsed_categories) result.unfiltered_observations = unfiltered_observations observations = _filter_units_by_annotations(unfiltered_observations) result.observations = observations result.mismatched_bounding_boxes = observations.n_filtered_units # If there are no annotations after filtering, return alpha as MIN_ALPHA if observations.total == 0: result.alpha = MIN_ALPHA return result # Compute alpha from the observations matrix n_minus_one = observations.total - 1 # Compute observed disagreement (Do) Do: float = 0.0 for unit_idx in range(observations.observations_matrix.shape[1]): u_den: float = float(observations.column_totals[unit_idx]) - 1.0 u = 1.0 / u_den partial_total: float = 0.0 for c in range(observations.observations_matrix.shape[0]): for k in range(c + 1, observations.observations_matrix.shape[0]): n_uc: int = observations.observations_matrix[c, unit_idx] n_uk: int = observations.observations_matrix[k, unit_idx] n_uc_n_uk: float = float(n_uc * n_uk) partial_total += n_uc_n_uk Do += u * partial_total Do = Do * n_minus_one # Compute expected disagreement (De) De: float = 0.0 for c in range(len(observations.row_totals)): for k in range(c + 1, len(observations.row_totals)): n_c: int = observations.row_totals[c] n_k: int = observations.row_totals[k] n_c_n_k: float = float(n_c * n_k) De += n_c_n_k # If De is zero, return MAX_ALPHA if De == 0.0: alpha = MAX_ALPHA else: # Finally compute alpha alpha: float = 1.0 - (Do / De) # Save the alpha value in the result object result.alpha = alpha # Now compute the alpha coefficient for each image individually alpha_per_image: dict[str, float] = {} for img_filename in gts[0].images.keys(): img_units: list[KAlphaUnit] = _filter_units_by_filename(units, img_filename) if len(img_units) == 0: alpha_per_image[img_filename] = float('nan') continue img_observations: KAlphaObservationsMatrix = _build_observations_matrix(gts, img_units, use_idx=True, collapsed_categories=collapsed_categories) img_observations = _filter_units_by_annotations(img_observations) if img_observations.total == 0: alpha_per_image[img_filename] = MIN_ALPHA continue n_minus_one_img = img_observations.total - 1 Do_img: float = 0.0 for unit_idx in range(img_observations.observations_matrix.shape[1]): img_u_den: float = float(img_observations.column_totals[unit_idx]) - 1.0 img_u = 1.0 / img_u_den img_partial_total: float = 0.0 for c in range(img_observations.observations_matrix.shape[0]): for k in range(c + 1, img_observations.observations_matrix.shape[0]): img_n_uc: int = img_observations.observations_matrix[c, unit_idx] img_n_uk: int = img_observations.observations_matrix[k, unit_idx] img_n_uc_n_uk: float = float(img_n_uc * img_n_uk) img_partial_total += img_n_uc_n_uk Do_img += img_u * img_partial_total Do_img = Do_img * n_minus_one_img De_img: float = 0.0 for c in range(len(img_observations.row_totals)): for k in range(c + 1, len(img_observations.row_totals)): img_n_c: int = img_observations.row_totals[c] img_n_k: int = img_observations.row_totals[k] img_n_c_n_k: float = float(img_n_c * img_n_k) De_img += img_n_c_n_k if De_img == 0.0: alpha_per_image[img_filename] = MAX_ALPHA continue alpha_img: float = 1.0 - (Do_img / De_img) alpha_per_image[img_filename] = alpha_img # Save the alpha per image object in the result result.alpha_per_image = deepcopy(alpha_per_image) # Compute mean alpha mean_alpha: float = float(np.nanmean(list(alpha_per_image.values()))) # Save the mean alpha in the result object result.mean_alpha = mean_alpha # Return the result object return result
def _find_units(gts: list[GroundTruthProtocol], iou_threshold: float, use_iom: bool = False) -> list[KAlphaUnit]: """ This function identifies "units" as defined by Krippendorff for his alpha metric. In our context, a unit is a set of overlapping bounding boxes which return an IoU greater than the threshold passed as an argument. Units are found through agglomerative clustering based on the IoU values between all bounding boxes. Parameters: gts (list[GroundTruth]): A list of GroundTruth objects containing annotations. iou_threshold (float): The IoU threshold to consider two annotations as overlapping. use_iom (bool): Whether to use IoM to find units. Defaults to false. Returns: list[KAlphaUnit]: A list of KAlphaUnit objects representing the identified units. """ # Check that the iou_threshold is between 0 and 1 if not (0.0 <= iou_threshold <= 1.0): raise ValueError("iou_threshold must be between 0 and 1") # Check that there are at least two ground truths to compare if len(gts) < 2: raise ValueError("At least two ground truths are required to find units") # Set correct IoU function based on leniency iou_fn = iom if use_iom else iou # Initialise unit list units: list[KAlphaUnit] = [] unit_id_counter: int = 0 # Get each image one by one and find units within it image_filenames: list[str] = list(gts[0].images.keys()) for img_filename in image_filenames: # Get all annotations for this image from all ground truths img_annotations: list = [] for gt in gts: img_annotations.extend(gt.annotations[img_filename]) if len(img_annotations) < 2: continue # Create the adjacency matrix for the annotations based on IoU rank = len(img_annotations) distance_matrix: np.ndarray = np.zeros((rank, rank)) for i in range(rank): for j in range(i + 1, rank): if img_annotations[i].gt_name == img_annotations[j].gt_name: distance_matrix[i, j] = INFINITY distance_matrix[j, i] = INFINITY else: iou_value = iou_fn(img_annotations[i], img_annotations[j]) distance_matrix[i, j] = 1.0 - iou_value distance_matrix[j, i] = 1.0 - iou_value # Set up agglomerative clustering ac = sk.cluster.AgglomerativeClustering( n_clusters=None, metric='precomputed', linkage='average', distance_threshold=1.0 - iou_threshold ) labels = ac.fit_predict(distance_matrix.reshape(rank, rank)) # Group annotations into units based on clustering labels current_units: list[KAlphaUnit] = [] for label in np.unique(labels): # Create new unit for each unique cluster ID new_unit: KAlphaUnit = KAlphaUnit(id=unit_id_counter, img_filename=img_filename) # Append all annotations in the cluster to that unit unit_annotations = [img_annotations[i] for i in range(rank) if labels[i] == label] for annotation in unit_annotations: annotation.unit_id = unit_id_counter new_unit.annotations.extend(unit_annotations) # Append all the found units to the current units list current_units.append(new_unit) # Increment the global unit ID counter for the next unit unit_id_counter += 1 units.extend(current_units) return units def _build_observations_matrix(gts: list[GroundTruthProtocol], units: list[KAlphaUnit], use_idx: bool = False, collapsed_categories: Optional[dict[str, list[int]]] = None) -> KAlphaObservationsMatrix: """ This function builds the observations matrix required to compute Krippendorff's alpha (according to the definition in his 2011 paper). Parameters: gts (list[GroundTruth]): A list of GroundTruth objects containing annotations. units (list[KAlphaUnit]): A list of KAlphaUnit objects representing the identified units. use_idx (bool): Whether to use the index of the unit in the units list as the column ID, instead of the unit's own ID. This is used for computing alpha per image, where units are filtered by image and their IDs may not be contiguous. collapsed_categories (dict[str, list[int]] | None): An optional dictionary to collapse categories into macro-categories, used for Scale Complexity computation. Returns: KAlphaObservationsMatrix: An object containing the observations matrix and related totals. """ # Create a matrix with the number of units as columns and the number of categories as rows n_categories = len(gts[0].categories_dict) n_units = len(units) observations_matrix: np.ndarray = np.zeros((n_categories, n_units), dtype=int) # Fill the observations matrix for unit in units: unit_id: int if use_idx: unit_id = units.index(unit) else: unit_id = unit.id for annotation in unit.annotations: annotation_category_id = annotation.category_id observations_matrix[annotation_category_id - 1, unit_id - 1] += 1 # Collapse the observations matrix if required if collapsed_categories: collapsed_matrix: np.ndarray = np.zeros((len(collapsed_categories), n_units), dtype=int) for idx, (macrocat_name, sub_cat_ids) in enumerate(collapsed_categories.items()): for sub_cat_id in sub_cat_ids: if macrocat_name.lower() == EXCLUDED_CATEGORIES: continue collapsed_matrix[idx, :] += observations_matrix[sub_cat_id - 1, :] observations_matrix = collapsed_matrix # Compute row totals row_totals: np.ndarray = observations_matrix.sum(axis=1) # Compute column totals col_totals: np.ndarray = observations_matrix.sum(axis=0) # Compute grand total grand_total: int = observations_matrix.sum() # Sanity check assert grand_total == col_totals.sum() == row_totals.sum() # Build KAlphaObservationsMatrix object observations = KAlphaObservationsMatrix( observations_matrix=observations_matrix, row_totals=row_totals, column_totals=col_totals, total=grand_total ) # Return the observations matrix return observations def _filter_units_by_annotations(observations: KAlphaObservationsMatrix) -> KAlphaObservationsMatrix: """ This function filters out units (columns) from the observations matrix which have one or less annotations, as required by Krippendorff's alpha definition in his 2011 paper. Parameters: observations (KAlphaObservationsMatrix): An object containing the observations matrix and related totals. Returns: KAlphaObservationsMatrix: The filtered observations matrix object. """ # Find all columns (units) which have one or less annotations columns_to_remove: list[int] = [] for col_idx in range(observations.column_totals.shape[0]): if observations.column_totals[col_idx] <= 1: columns_to_remove.append(col_idx) # Remove all such columns from the observations matrix filtered_matrix = np.delete(observations.observations_matrix, columns_to_remove, axis=1) # Recompute totals row_totals: np.ndarray = filtered_matrix.sum(axis=1) col_totals: np.ndarray = filtered_matrix.sum(axis=0) grand_total: int = filtered_matrix.sum() # Sanity check assert grand_total == col_totals.sum() == row_totals.sum() # Build new KAlphaObservationsMatrix object filtered_observations = KAlphaObservationsMatrix( observations_matrix=filtered_matrix, row_totals=row_totals, column_totals=col_totals, total=grand_total, n_filtered_units=len(columns_to_remove), macro_categories_dict=observations.macro_categories_dict ) # Return the filtered object return filtered_observations def _filter_units_by_filename(units: list[KAlphaUnit], filename: str) -> list[KAlphaUnit]: """ This function filters the list of units to only include those associated with a specific image filename. Parameters: units (list[KAlphaUnit]): A list of KAlphaUnit objects representing the identified units. filename (str): The image filename to filter units by. Returns: list[KAlphaUnit]: A filtered list of KAlphaUnit objects. """ filtered_units: list[KAlphaUnit] = [unit for unit in units if unit.img_filename == filename] return filtered_units