from iaa_od.models import GroundTruthProtocol, KAlphaUnit, Result, KAlphaObservationsMatrix
from iaa_od.models.constants import MIN_ALPHA, MAX_ALPHA, STD_IOU_THR, MIN_IOU_THR, MAX_IOU_THR, INFINITY, EXCLUDED_CATEGORIES
from .iou import iou, iom
import numpy as np
import sklearn as sk
from typing import Optional
from copy import deepcopy
[docs]
def alpha(gts: list[GroundTruthProtocol], /, *, iou_threshold: float = STD_IOU_THR, use_iom: bool = False, precomputed_result: Optional[Result] = None, collapsed_categories: Optional[dict[str, list[int]]] = None) -> Result:
"""
This function computes Krippendorff's alpha for a set of ground truths containing bounding box annotations.
Parameters:
gts (list[GroundTruth]): A list of GroundTruth objects containing annotations.
iou_threshold (float): The IoU threshold to consider two annotations as overlapping.
use_iom (bool): Whether to use IoM matching when finding units. Defaults to false.
precomputed_result (Result | None): An optional precomputed Result object, used to merge results from multiple algorithms.
collapsed_categories (dict[str, list[int]] | None): An optional dictionary to collapse categories into macro-categories, used for Scale Complexity computation.
Returns:
Result: A Result object containing the computed alpha value and other relevant information.
"""
# Check correctness of IoU threshold
if not (MIN_IOU_THR <= iou_threshold <= MAX_IOU_THR):
raise ValueError(f"iou_threshold must be between {MIN_IOU_THR} and {MAX_IOU_THR}")
# Check that there are at least two ground truths to compare
if len(gts) < 2:
raise ValueError("At least two ground truths are required to compute alpha")
# Initialise result
result: Result
if not precomputed_result:
result = Result(
gts=gts,
iou_thr=iou_threshold,
iom=use_iom
)
else:
result = precomputed_result
# Find units in the ground truths
units: list[KAlphaUnit] = _find_units(gts, iou_threshold, use_iom=use_iom)
result.units = units
# If no units were found, return alpha as NaN
if len(units) == 0:
result.alpha = float('nan')
return result
# Create the observations matrix
unfiltered_observations: KAlphaObservationsMatrix = _build_observations_matrix(gts, units, collapsed_categories=collapsed_categories)
result.unfiltered_observations = unfiltered_observations
observations = _filter_units_by_annotations(unfiltered_observations)
result.observations = observations
result.mismatched_bounding_boxes = observations.n_filtered_units
# If there are no annotations after filtering, return alpha as MIN_ALPHA
if observations.total == 0:
result.alpha = MIN_ALPHA
return result
# Compute alpha from the observations matrix
n_minus_one = observations.total - 1
# Compute observed disagreement (Do)
Do: float = 0.0
for unit_idx in range(observations.observations_matrix.shape[1]):
u_den: float = float(observations.column_totals[unit_idx]) - 1.0
u = 1.0 / u_den
partial_total: float = 0.0
for c in range(observations.observations_matrix.shape[0]):
for k in range(c + 1, observations.observations_matrix.shape[0]):
n_uc: int = observations.observations_matrix[c, unit_idx]
n_uk: int = observations.observations_matrix[k, unit_idx]
n_uc_n_uk: float = float(n_uc * n_uk)
partial_total += n_uc_n_uk
Do += u * partial_total
Do = Do * n_minus_one
# Compute expected disagreement (De)
De: float = 0.0
for c in range(len(observations.row_totals)):
for k in range(c + 1, len(observations.row_totals)):
n_c: int = observations.row_totals[c]
n_k: int = observations.row_totals[k]
n_c_n_k: float = float(n_c * n_k)
De += n_c_n_k
# If De is zero, return MAX_ALPHA
if De == 0.0:
alpha = MAX_ALPHA
else:
# Finally compute alpha
alpha: float = 1.0 - (Do / De)
# Save the alpha value in the result object
result.alpha = alpha
# Now compute the alpha coefficient for each image individually
alpha_per_image: dict[str, float] = {}
for img_filename in gts[0].images.keys():
img_units: list[KAlphaUnit] = _filter_units_by_filename(units, img_filename)
if len(img_units) == 0:
alpha_per_image[img_filename] = float('nan')
continue
img_observations: KAlphaObservationsMatrix = _build_observations_matrix(gts, img_units, use_idx=True, collapsed_categories=collapsed_categories)
img_observations = _filter_units_by_annotations(img_observations)
if img_observations.total == 0:
alpha_per_image[img_filename] = MIN_ALPHA
continue
n_minus_one_img = img_observations.total - 1
Do_img: float = 0.0
for unit_idx in range(img_observations.observations_matrix.shape[1]):
img_u_den: float = float(img_observations.column_totals[unit_idx]) - 1.0
img_u = 1.0 / img_u_den
img_partial_total: float = 0.0
for c in range(img_observations.observations_matrix.shape[0]):
for k in range(c + 1, img_observations.observations_matrix.shape[0]):
img_n_uc: int = img_observations.observations_matrix[c, unit_idx]
img_n_uk: int = img_observations.observations_matrix[k, unit_idx]
img_n_uc_n_uk: float = float(img_n_uc * img_n_uk)
img_partial_total += img_n_uc_n_uk
Do_img += img_u * img_partial_total
Do_img = Do_img * n_minus_one_img
De_img: float = 0.0
for c in range(len(img_observations.row_totals)):
for k in range(c + 1, len(img_observations.row_totals)):
img_n_c: int = img_observations.row_totals[c]
img_n_k: int = img_observations.row_totals[k]
img_n_c_n_k: float = float(img_n_c * img_n_k)
De_img += img_n_c_n_k
if De_img == 0.0:
alpha_per_image[img_filename] = MAX_ALPHA
continue
alpha_img: float = 1.0 - (Do_img / De_img)
alpha_per_image[img_filename] = alpha_img
# Save the alpha per image object in the result
result.alpha_per_image = deepcopy(alpha_per_image)
# Compute mean alpha
mean_alpha: float = float(np.nanmean(list(alpha_per_image.values())))
# Save the mean alpha in the result object
result.mean_alpha = mean_alpha
# Return the result object
return result
def _find_units(gts: list[GroundTruthProtocol], iou_threshold: float, use_iom: bool = False) -> list[KAlphaUnit]:
"""
This function identifies "units" as defined by Krippendorff for his alpha metric.
In our context, a unit is a set of overlapping bounding boxes which return an IoU greater than
the threshold passed as an argument.
Units are found through agglomerative clustering based on the IoU values between all bounding boxes.
Parameters:
gts (list[GroundTruth]): A list of GroundTruth objects containing annotations.
iou_threshold (float): The IoU threshold to consider two annotations as overlapping.
use_iom (bool): Whether to use IoM to find units. Defaults to false.
Returns:
list[KAlphaUnit]: A list of KAlphaUnit objects representing the identified units.
"""
# Check that the iou_threshold is between 0 and 1
if not (0.0 <= iou_threshold <= 1.0):
raise ValueError("iou_threshold must be between 0 and 1")
# Check that there are at least two ground truths to compare
if len(gts) < 2:
raise ValueError("At least two ground truths are required to find units")
# Set correct IoU function based on leniency
iou_fn = iom if use_iom else iou
# Initialise unit list
units: list[KAlphaUnit] = []
unit_id_counter: int = 0
# Get each image one by one and find units within it
image_filenames: list[str] = list(gts[0].images.keys())
for img_filename in image_filenames:
# Get all annotations for this image from all ground truths
img_annotations: list = []
for gt in gts:
img_annotations.extend(gt.annotations[img_filename])
if len(img_annotations) < 2:
continue
# Create the adjacency matrix for the annotations based on IoU
rank = len(img_annotations)
distance_matrix: np.ndarray = np.zeros((rank, rank))
for i in range(rank):
for j in range(i + 1, rank):
if img_annotations[i].gt_name == img_annotations[j].gt_name:
distance_matrix[i, j] = INFINITY
distance_matrix[j, i] = INFINITY
else:
iou_value = iou_fn(img_annotations[i], img_annotations[j])
distance_matrix[i, j] = 1.0 - iou_value
distance_matrix[j, i] = 1.0 - iou_value
# Set up agglomerative clustering
ac = sk.cluster.AgglomerativeClustering(
n_clusters=None,
metric='precomputed',
linkage='average',
distance_threshold=1.0 - iou_threshold
)
labels = ac.fit_predict(distance_matrix.reshape(rank, rank))
# Group annotations into units based on clustering labels
current_units: list[KAlphaUnit] = []
for label in np.unique(labels):
# Create new unit for each unique cluster ID
new_unit: KAlphaUnit = KAlphaUnit(id=unit_id_counter, img_filename=img_filename)
# Append all annotations in the cluster to that unit
unit_annotations = [img_annotations[i] for i in range(rank) if labels[i] == label]
for annotation in unit_annotations:
annotation.unit_id = unit_id_counter
new_unit.annotations.extend(unit_annotations)
# Append all the found units to the current units list
current_units.append(new_unit)
# Increment the global unit ID counter for the next unit
unit_id_counter += 1
units.extend(current_units)
return units
def _build_observations_matrix(gts: list[GroundTruthProtocol], units: list[KAlphaUnit], use_idx: bool = False, collapsed_categories: Optional[dict[str, list[int]]] = None) -> KAlphaObservationsMatrix:
"""
This function builds the observations matrix required to compute Krippendorff's alpha
(according to the definition in his 2011 paper).
Parameters:
gts (list[GroundTruth]): A list of GroundTruth objects containing annotations.
units (list[KAlphaUnit]): A list of KAlphaUnit objects representing the identified units.
use_idx (bool): Whether to use the index of the unit in the units list as the column ID, instead of the unit's own ID. This is used for computing alpha per image, where units are filtered by image and their IDs may not be contiguous.
collapsed_categories (dict[str, list[int]] | None): An optional dictionary to collapse categories into macro-categories, used for Scale Complexity computation.
Returns:
KAlphaObservationsMatrix: An object containing the observations matrix and related totals.
"""
# Create a matrix with the number of units as columns and the number of categories as rows
n_categories = len(gts[0].categories_dict)
n_units = len(units)
observations_matrix: np.ndarray = np.zeros((n_categories, n_units), dtype=int)
# Fill the observations matrix
for unit in units:
unit_id: int
if use_idx:
unit_id = units.index(unit)
else:
unit_id = unit.id
for annotation in unit.annotations:
annotation_category_id = annotation.category_id
observations_matrix[annotation_category_id - 1, unit_id - 1] += 1
# Collapse the observations matrix if required
if collapsed_categories:
collapsed_matrix: np.ndarray = np.zeros((len(collapsed_categories), n_units), dtype=int)
for idx, (macrocat_name, sub_cat_ids) in enumerate(collapsed_categories.items()):
for sub_cat_id in sub_cat_ids:
if macrocat_name.lower() == EXCLUDED_CATEGORIES:
continue
collapsed_matrix[idx, :] += observations_matrix[sub_cat_id - 1, :]
observations_matrix = collapsed_matrix
# Compute row totals
row_totals: np.ndarray = observations_matrix.sum(axis=1)
# Compute column totals
col_totals: np.ndarray = observations_matrix.sum(axis=0)
# Compute grand total
grand_total: int = observations_matrix.sum()
# Sanity check
assert grand_total == col_totals.sum() == row_totals.sum()
# Build KAlphaObservationsMatrix object
observations = KAlphaObservationsMatrix(
observations_matrix=observations_matrix,
row_totals=row_totals,
column_totals=col_totals,
total=grand_total
)
# Return the observations matrix
return observations
def _filter_units_by_annotations(observations: KAlphaObservationsMatrix) -> KAlphaObservationsMatrix:
"""
This function filters out units (columns) from the observations matrix which have one or less annotations,
as required by Krippendorff's alpha definition in his 2011 paper.
Parameters:
observations (KAlphaObservationsMatrix): An object containing the observations matrix and related totals.
Returns:
KAlphaObservationsMatrix: The filtered observations matrix object.
"""
# Find all columns (units) which have one or less annotations
columns_to_remove: list[int] = []
for col_idx in range(observations.column_totals.shape[0]):
if observations.column_totals[col_idx] <= 1:
columns_to_remove.append(col_idx)
# Remove all such columns from the observations matrix
filtered_matrix = np.delete(observations.observations_matrix, columns_to_remove, axis=1)
# Recompute totals
row_totals: np.ndarray = filtered_matrix.sum(axis=1)
col_totals: np.ndarray = filtered_matrix.sum(axis=0)
grand_total: int = filtered_matrix.sum()
# Sanity check
assert grand_total == col_totals.sum() == row_totals.sum()
# Build new KAlphaObservationsMatrix object
filtered_observations = KAlphaObservationsMatrix(
observations_matrix=filtered_matrix,
row_totals=row_totals,
column_totals=col_totals,
total=grand_total,
n_filtered_units=len(columns_to_remove),
macro_categories_dict=observations.macro_categories_dict
)
# Return the filtered object
return filtered_observations
def _filter_units_by_filename(units: list[KAlphaUnit], filename: str) -> list[KAlphaUnit]:
"""
This function filters the list of units to only include those associated with a specific image filename.
Parameters:
units (list[KAlphaUnit]): A list of KAlphaUnit objects representing the identified units.
filename (str): The image filename to filter units by.
Returns:
list[KAlphaUnit]: A filtered list of KAlphaUnit objects.
"""
filtered_units: list[KAlphaUnit] = [unit for unit in units if unit.img_filename == filename]
return filtered_units