from typing import Any, Optional
import json
from datetime import timedelta
[docs]
def per_image_fatigue_granularities(result) -> dict[str, "Granularities"]:
"""
Compute per-image count and size granularities for the whole dataset, normalised
against the dataset-wide benchmark so the values are comparable across images.
This is the granularity counterpart to `result.alpha_per_image`: it is what
`show_annotator_fatigue_granularity` plots against the normalised timestamps. The
dataset-wide benchmark (rather than each image's own median) is essential here, since
a temporal view is only meaningful if a value of, say, 1.4 means the same thing on the
first image of a session as it does on the last.
The returned mapping is keyed by image filename, matching `AnnotationTimestamp.image_filename`.
Images whose units are all singletons, and annotators that did not contribute to any
non-singleton unit on a given image, are simply absent from the corresponding entry;
the plotting code treats those as missing (NaN) data points.
Parameters:
result (Result): The Result object containing the Ground Truths and Units.
Returns:
dict[str, Granularities]: A mapping from image filename to that image's Granularities.
"""
# Adjust this import to wherever all_granularities / per_image_granularities live in
# your package if they are not re-exported at the top level.
from iaa_od import all_granularities, per_image_granularities
global_benchmark = all_granularities(result)
return per_image_granularities(result, benchmark=global_benchmark)
def _load_timing_files(filenames: list[str]) -> dict[str, list[dict[str, Any]]]:
from iaa_od.models.constants import ANNOTATIONS_KEY, GT_NAME_KEY
"""
Function which loads JSON timing files into a dictionary mapping GT names to their corresponding annotation timings.
Args:
- filenames: A list of strings representing the paths to the JSON timing files.
Returns:
- A dictionary where the keys are GT names (strings) and the values are lists of dictionaries, each representing an annotation timing.
"""
timing_files: dict[str, list[dict[str, Any]]] = {}
for file in filenames:
with open(file, 'r') as f:
obj = json.load(f)
gt_name = obj.get(GT_NAME_KEY)
if gt_name is None:
raise ValueError(f"Timing file {file} does not contain {GT_NAME_KEY} key.")
timing_files[gt_name] = obj.get(ANNOTATIONS_KEY)
if timing_files[gt_name] is None:
raise ValueError(f"Timing file {file} does not contain {ANNOTATIONS_KEY} key.")
return timing_files
def _find_sessions(annotation_data: list[dict[str, Any]], /, *, delta_time_minutes: int, result) -> dict[int, list]:
"""
Function which finds annotation sessions based on the provided annotation data and a specified time delta.
Parameters:
annotation_data (list[dict[str, Any]]): A list of dictionaries, each representing an annotation timing.
delta_time_minutes (int): An integer representing the time delta in minutes to determine when a new session should start.
Returns:
dict[int, list]: A dictionary where the keys are session IDs (integers) and the values are lists
"""
from iaa_od import AnnotationTimestamp
# Initialise session counter and sessions dictionary
session_counter = 0
sessions: dict[int, list[AnnotationTimestamp]] = {}
# Iterate through data to find sessions
for annotation in annotation_data:
set_next_session: bool = False
# Create a new AnnotationTimestamp object for the current annotation
current_annotation: AnnotationTimestamp = AnnotationTimestamp(annotation, result)
# Look for the next annotation and create an AnnotationTimestamp object for it to compare timestamps
next_annotation: Optional[AnnotationTimestamp] = None
if annotation == annotation_data[-1]: # Check if not the last annotation
break;
next_annotation = AnnotationTimestamp(annotation_timestamp_data=annotation_data[annotation_data.index(annotation) + 1], result=result)
if next_annotation is not None:
# Check the time difference between the two timestamps
delta_time: timedelta = next_annotation.timestamp_datetime - current_annotation.timestamp_datetime
# If the time difference is greater or equal to delta_time_minutes minutes, begin a new session
if delta_time >= timedelta(minutes=delta_time_minutes):
set_next_session = True
# Add the current annotation to the current session
if session_counter not in sessions:
sessions[session_counter] = []
sessions[session_counter].append(current_annotation)
if set_next_session:
session_counter += 1
# Normalise all session timestamps by subtracting the smallest timestamp for that session from all timestamps in that session
for _, annotation_timestamps in sessions.items():
min_timestamp = min(annotation.timestamp_datetime for annotation in annotation_timestamps)
min_timestamp_int = min(annotation.timestamp for annotation in annotation_timestamps)
for annotation in annotation_timestamps:
annotation.normalised_timestamp = annotation.timestamp_datetime - min_timestamp
annotation.timestamp -= min_timestamp_int
# Order annotations in each session by timestamp
for _, annotation_timestamps in sessions.items():
annotation_timestamps.sort(key=lambda x: x.timestamp)
return sessions