from iaa_od.models import Result, AnnotationProtocol, Granularities, STD_IOM_THR
from iaa_od.utils import find_all_contained_bboxes
from copy import deepcopy
[docs]
def all_granularities(result: Result, /, *, iom_thr: float = STD_IOM_THR) -> Granularities:
"""
Compute Count and Size Granularities for all annotators in the provided Ground Truths, using a new approach based on "pseudo-units" and lenient IoU matching.
Parameters:
result (Result): The Result object containing the Ground Truths and Units to compute granularities on.
iom_thr (float): The Intersection over Minimum threshold to consider an annotation as "contained" within a unit. Defaults to STD_IOM_THR.
Returns:
Granularities: An object containing the computed granularities for all annotators, along with the median values and benchmarks.
"""
# Filter all dataset annotations on a per-image basis
all_annotations: dict[str, list[AnnotationProtocol]] = {}
for gt in result.gts:
for img_filename, annotations in gt.annotations.items():
if len(annotations) > 0:
if img_filename not in all_annotations:
all_annotations[img_filename] = []
all_annotations[img_filename].extend(deepcopy(annotations))
# Initialise dictionaries for final average
cg_unit_values: dict[str, list[float]] = {gt.name: [] for gt in result.gts}
sg_unit_values: dict[str, list[float]] = {gt.name: [] for gt in result.gts}
# Initialise counter for "true" lone units
singletons: int = 0
# For each unit in the dataset...
for unit in result.units:
# ... find the biggest annotation in the unit...
biggest_annotation: AnnotationProtocol = max(unit.annotations, key=lambda ann: ann.bbox_coords.area)
# ... gather all annotations for the current image...
all_image_annotations: list[AnnotationProtocol] = all_annotations[unit.img_filename]
# ... find all annotations that are contained within the biggest annotation...
contained_bboxes: list[AnnotationProtocol] = find_all_contained_bboxes(biggest_annotation, all_image_annotations, iom_thr=iom_thr)
# ... if there are none, skip to the next unit...
if not contained_bboxes:
singletons += 1
continue
unit_counts: dict[str, int] = {gt.name: 0 for gt in result.gts}
unit_count_totals: int = 0
unit_sizes: dict[str, float] = {gt.name: 0.0 for gt in result.gts}
unit_size_totals: float = 0.0
# ... count all annotations and sum their sizes on a per-annotator basis...
unit_counts[biggest_annotation.gt_name] += 1
unit_count_totals += 1
unit_sizes[biggest_annotation.gt_name] += biggest_annotation.bbox_coords.area
unit_size_totals += biggest_annotation.bbox_coords.area
for ann in contained_bboxes:
unit_counts[ann.gt_name] += 1
unit_count_totals += 1
unit_sizes[ann.gt_name] += ann.bbox_coords.area
unit_size_totals += ann.bbox_coords.area
# ... and compute the count and size granularity contributions for each annotator in the current unit.
for gt_name in unit_counts.keys():
if unit_count_totals == 0:
raise ValueError("Something went wrong computing the count granularity contribution for the current unit. Total count is zero.")
cg_unit_values[gt_name].append(float(unit_counts[gt_name]) / float(unit_count_totals))
if unit_size_totals == 0.0:
raise ValueError("Something went wrong computing the size granularity contribution for the current unit. Total size is zero.")
sg_unit_values[gt_name].append(float(unit_sizes[gt_name]) / float(unit_size_totals))
# Finally, filter out zeroes from the value dictionaries...
for gt_name in cg_unit_values.keys():
cg_unit_values[gt_name] = [value for value in cg_unit_values[gt_name] if value > 0.0]
sg_unit_values[gt_name] = [value for value in sg_unit_values[gt_name] if value > 0.0]
# ... average all contributions for each annotator...
if any(len(values) == 0 for values in cg_unit_values.values()):
raise ValueError("Something went wrong computing the final count granularities. At least one annotator has no contributions after filtering.")
cg_values: dict[str, float] = {gt_name: float(sum(values)) / float(len(values)) for gt_name, values in cg_unit_values.items()}
if any(len(values) == 0 for values in sg_unit_values.values()):
raise ValueError("Something went wrong computing the final size granularities. At least one annotator has no contributions after filtering.")
sg_values: dict[str, float] = {gt_name: float(sum(values)) / float(len(values)) for gt_name, values in sg_unit_values.items()}
# ... find the median for both granularities...
cg_median_and_benchmark: tuple[list[str], float] = _median_with_keys(cg_values)
sg_median_and_benchmark: tuple[list[str], float] = _median_with_keys(sg_values)
# ... normalise all values by the median...
cg_values: dict[str, float] = {gt_name: value / cg_median_and_benchmark[1] for gt_name, value in cg_values.items()}
sg_values: dict[str, float] = {gt_name: value / sg_median_and_benchmark[1] for gt_name, value in sg_values.items()}
# ... order by value (ascending) for representation purposes...
cg_values: dict[str, float] = dict(sorted(cg_values.items(), key=lambda item: item[1]))
sg_values: dict[str, float] = dict(sorted(sg_values.items(), key=lambda item: item[1]))
# ... and return the final granularities object.
granularities: Granularities = Granularities(count=cg_values,
count_benchmark=cg_median_and_benchmark[0],
count_median=cg_median_and_benchmark[1],
size=sg_values,
size_benchmark=sg_median_and_benchmark[0],
size_median=sg_median_and_benchmark[1],
singletons=singletons
)
return granularities
[docs]
def per_image_granularities(result: Result, /, *, iom_thr: float = STD_IOM_THR, benchmark: Granularities | None = None) -> dict[str, Granularities]:
"""
Compute Count and Size Granularities separately for each image in the dataset.
This mirrors `all_granularities`, but instead of aggregating the per-unit contributions
over the whole dataset, it restricts the computation to the units belonging to a single
image at a time and returns one Granularities object per image.
Normalisation behaviour is controlled by `benchmark`:
- When `benchmark` is None (default), each image is self-contained: the per-annotator
values are normalised by the median computed *within that image*. A value of 1.0
therefore identifies the median annotator on that image, and the benchmark annotator
may differ from one image to the next. Use this to inspect who annotates by
group/instance relative to the others on a given image.
- When `benchmark` is supplied (typically the result of `all_granularities(result)`),
every image is normalised by the global medians (`benchmark.count_median` and
`benchmark.size_median`). Values are then comparable across images and against the
global result, which is the appropriate choice for tracking how a single annotator's
behaviour drifts image by image (e.g. for labelling-fatigue analysis).
Images whose units are all singletons carry no granularity information and are omitted from
the result. For a given image, only annotators that contribute to at least one non-singleton
unit on that image appear in the returned dictionaries.
Parameters:
result (Result): The Result object containing the Ground Truths and Units.
iom_thr (float): The IoM threshold to consider an annotation as "contained"
within a unit. Defaults to 0.5.
benchmark (Granularities | None): If provided, its count_median and size_median are
used as the normalisation divisor for every image instead of each image's own
median, and its benchmark annotators are carried over. Defaults to None.
Returns:
dict[str, Granularities]: A mapping from image filename to the Granularities computed
from that image's units alone.
"""
# Index every dataset annotation by the image it belongs to (as in all_granularities).
all_annotations: dict[str, list[AnnotationProtocol]] = {}
for gt in result.gts:
for img_filename, annotations in gt.annotations.items():
if len(annotations) > 0:
if img_filename not in all_annotations:
all_annotations[img_filename] = []
all_annotations[img_filename].extend(deepcopy(annotations))
# Group the units by the image they were drawn on.
units_by_image: dict[str, list] = {}
for unit in result.units:
if unit.img_filename not in units_by_image:
units_by_image[unit.img_filename] = []
units_by_image[unit.img_filename].append(unit)
granularities_by_image: dict[str, Granularities] = {}
# Process one image at a time.
for img_filename, image_units in units_by_image.items():
# Initialise dictionaries for the per-image average
cg_unit_values: dict[str, list[float]] = {gt.name: [] for gt in result.gts}
sg_unit_values: dict[str, list[float]] = {gt.name: [] for gt in result.gts}
# Initialise counter for "true" lone units on this image
singletons: int = 0
# For each unit on the current image...
for unit in image_units:
# ... find the biggest annotation in the unit...
biggest_annotation: AnnotationProtocol = max(unit.annotations, key=lambda ann: ann.bbox_coords.area)
# ... gather all annotations for the current image...
all_image_annotations: list[AnnotationProtocol] = all_annotations[unit.img_filename]
# ... find all annotations that are contained within the biggest annotation...
contained_bboxes: list[AnnotationProtocol] = find_all_contained_bboxes(biggest_annotation, all_image_annotations, iom_thr=iom_thr)
# ... if there are none, skip to the next unit...
if not contained_bboxes:
singletons += 1
continue
unit_counts: dict[str, int] = {gt.name: 0 for gt in result.gts}
unit_count_totals: int = 0
unit_sizes: dict[str, float] = {gt.name: 0.0 for gt in result.gts}
unit_size_totals: float = 0.0
# ... count all annotations and sum their sizes on a per-annotator basis...
unit_counts[biggest_annotation.gt_name] += 1
unit_count_totals += 1
unit_sizes[biggest_annotation.gt_name] += biggest_annotation.bbox_coords.area
unit_size_totals += biggest_annotation.bbox_coords.area
for ann in contained_bboxes:
unit_counts[ann.gt_name] += 1
unit_count_totals += 1
unit_sizes[ann.gt_name] += ann.bbox_coords.area
unit_size_totals += ann.bbox_coords.area
# ... and compute the count and size granularity contributions for each annotator in the current unit.
for gt_name in unit_counts.keys():
if unit_count_totals == 0:
raise ValueError("Something went wrong computing the count granularity contribution for the current unit. Total count is zero.")
cg_unit_values[gt_name].append(float(unit_counts[gt_name]) / float(unit_count_totals))
if unit_size_totals == 0.0:
raise ValueError("Something went wrong computing the size granularity contribution for the current unit. Total size is zero.")
sg_unit_values[gt_name].append(float(unit_sizes[gt_name]) / float(unit_size_totals))
# Filter out zeroes from the value dictionaries...
for gt_name in cg_unit_values.keys():
cg_unit_values[gt_name] = [value for value in cg_unit_values[gt_name] if value > 0.0]
sg_unit_values[gt_name] = [value for value in sg_unit_values[gt_name] if value > 0.0]
# Keep only annotators that contributed to at least one non-singleton unit on this image.
# Unlike the dataset-wide computation, an annotator may legitimately be absent from an image,
# so a missing contribution here is expected rather than an error.
cg_present: dict[str, list[float]] = {gt_name: values for gt_name, values in cg_unit_values.items() if len(values) > 0}
sg_present: dict[str, list[float]] = {gt_name: values for gt_name, values in sg_unit_values.items() if len(values) > 0}
# An image made up solely of singletons carries no granularity information; skip it.
if not cg_present or not sg_present:
continue
# ... average all contributions for each present annotator...
cg_values: dict[str, float] = {gt_name: float(sum(values)) / float(len(values)) for gt_name, values in cg_present.items()}
sg_values: dict[str, float] = {gt_name: float(sum(values)) / float(len(values)) for gt_name, values in sg_present.items()}
# ... determine the benchmark annotator and the median to normalise against...
if benchmark is None:
# Self-contained: the benchmark is the median annotator on this image.
cg_benchmark_keys, cg_median = _median_with_keys(cg_values)
sg_benchmark_keys, sg_median = _median_with_keys(sg_values)
else:
# Shared scale: normalise every image against the supplied global benchmark.
cg_benchmark_keys, cg_median = benchmark.count_benchmark, benchmark.count_median
sg_benchmark_keys, sg_median = benchmark.size_benchmark, benchmark.size_median
# ... normalise all values by the chosen median...
cg_values: dict[str, float] = {gt_name: value / cg_median for gt_name, value in cg_values.items()}
sg_values: dict[str, float] = {gt_name: value / sg_median for gt_name, value in sg_values.items()}
# ... order by value (ascending) for representation purposes...
cg_values: dict[str, float] = dict(sorted(cg_values.items(), key=lambda item: item[1]))
sg_values: dict[str, float] = dict(sorted(sg_values.items(), key=lambda item: item[1]))
# ... and store the granularities object for this image.
granularities_by_image[img_filename] = Granularities(count=cg_values,
count_benchmark=cg_benchmark_keys,
count_median=cg_median,
size=sg_values,
size_benchmark=sg_benchmark_keys,
size_median=sg_median,
singletons=singletons
)
return granularities_by_image
def _median_with_keys(values: dict[str, float]) -> tuple[list[str], float]:
"""
Computes the median value from a dictionary of values and returns it along with the corresponding keys.
Parameters:
values (dict[str, float]): A dictionary mapping keys to their corresponding float values.
Returns:
tuple[list[str], float]: A tuple containing a list of keys corresponding to the median value and the median value itself. If the number of items is odd, the list will contain one key; if even, it will contain two keys.
"""
# Sort all items by their values
sorted_items = sorted(values.items(), key=lambda item: item[1])
# If the number of items is odd, return the middle value.
if len(sorted_items) % 2 == 1:
median_index = len(sorted_items) // 2
median_keys = [sorted_items[median_index][0]]
median_value = sorted_items[median_index][1]
# If it is even, return the average of the two middle values.
else:
first_median_index = len(sorted_items) // 2 - 1
second_median_index = len(sorted_items) // 2
median_keys = [sorted_items[first_median_index][0], sorted_items[second_median_index][0]]
median_value = (sorted_items[first_median_index][1] + sorted_items[second_median_index][1]) / 2.0
return (median_keys, median_value)