Source code for iaa_od.utils.fatigue_evaluation

from typing import Any, Optional
import json
from datetime import timedelta

[docs] def extract_fatigue_data(result, timing_files: list[str]) -> dict[str, dict[int, list]]: """ Extracts fatigue data from the provided timing files and computes annotation sessions for each Ground Truth (GT). Parameters: result (Result): The Result object containing the Ground Truths and Units. timing_files (list[str]): A list of strings representing the paths to the JSON timing files Returns: dict[str, dict[int, list]]: A dictionary where the keys are GT names (strings) and the values are dictionaries mapping session IDs (integers) to lists of AnnotationTimestamp objects. """ from iaa_od import AnnotationTimestamp # Check that the timing files list is not empty if not timing_files: raise ValueError("Timing files list cannot be empty.") # Load all timing files into a dictionary mapping GT names to their corresponding annotation timings timing: dict[str, list[dict[str, Any]]] = _load_timing_files(timing_files) # Find all annotation sessions for each GT sessions: dict[str, dict[int, list[AnnotationTimestamp]]] = {} for gt_name, annotations in timing.items(): sessions[gt_name] = _find_sessions(annotations, delta_time_minutes=30, result=result) return sessions
[docs] def per_image_fatigue_granularities(result) -> dict[str, "Granularities"]: """ Compute per-image count and size granularities for the whole dataset, normalised against the dataset-wide benchmark so the values are comparable across images. This is the granularity counterpart to `result.alpha_per_image`: it is what `show_annotator_fatigue_granularity` plots against the normalised timestamps. The dataset-wide benchmark (rather than each image's own median) is essential here, since a temporal view is only meaningful if a value of, say, 1.4 means the same thing on the first image of a session as it does on the last. The returned mapping is keyed by image filename, matching `AnnotationTimestamp.image_filename`. Images whose units are all singletons, and annotators that did not contribute to any non-singleton unit on a given image, are simply absent from the corresponding entry; the plotting code treats those as missing (NaN) data points. Parameters: result (Result): The Result object containing the Ground Truths and Units. Returns: dict[str, Granularities]: A mapping from image filename to that image's Granularities. """ # Adjust this import to wherever all_granularities / per_image_granularities live in # your package if they are not re-exported at the top level. from iaa_od import all_granularities, per_image_granularities global_benchmark = all_granularities(result) return per_image_granularities(result, benchmark=global_benchmark)
def _load_timing_files(filenames: list[str]) -> dict[str, list[dict[str, Any]]]: from iaa_od.models.constants import ANNOTATIONS_KEY, GT_NAME_KEY """ Function which loads JSON timing files into a dictionary mapping GT names to their corresponding annotation timings. Args: - filenames: A list of strings representing the paths to the JSON timing files. Returns: - A dictionary where the keys are GT names (strings) and the values are lists of dictionaries, each representing an annotation timing. """ timing_files: dict[str, list[dict[str, Any]]] = {} for file in filenames: with open(file, 'r') as f: obj = json.load(f) gt_name = obj.get(GT_NAME_KEY) if gt_name is None: raise ValueError(f"Timing file {file} does not contain {GT_NAME_KEY} key.") timing_files[gt_name] = obj.get(ANNOTATIONS_KEY) if timing_files[gt_name] is None: raise ValueError(f"Timing file {file} does not contain {ANNOTATIONS_KEY} key.") return timing_files def _find_sessions(annotation_data: list[dict[str, Any]], /, *, delta_time_minutes: int, result) -> dict[int, list]: """ Function which finds annotation sessions based on the provided annotation data and a specified time delta. Parameters: annotation_data (list[dict[str, Any]]): A list of dictionaries, each representing an annotation timing. delta_time_minutes (int): An integer representing the time delta in minutes to determine when a new session should start. Returns: dict[int, list]: A dictionary where the keys are session IDs (integers) and the values are lists """ from iaa_od import AnnotationTimestamp # Initialise session counter and sessions dictionary session_counter = 0 sessions: dict[int, list[AnnotationTimestamp]] = {} # Iterate through data to find sessions for annotation in annotation_data: set_next_session: bool = False # Create a new AnnotationTimestamp object for the current annotation current_annotation: AnnotationTimestamp = AnnotationTimestamp(annotation, result) # Look for the next annotation and create an AnnotationTimestamp object for it to compare timestamps next_annotation: Optional[AnnotationTimestamp] = None if annotation == annotation_data[-1]: # Check if not the last annotation break; next_annotation = AnnotationTimestamp(annotation_timestamp_data=annotation_data[annotation_data.index(annotation) + 1], result=result) if next_annotation is not None: # Check the time difference between the two timestamps delta_time: timedelta = next_annotation.timestamp_datetime - current_annotation.timestamp_datetime # If the time difference is greater or equal to delta_time_minutes minutes, begin a new session if delta_time >= timedelta(minutes=delta_time_minutes): set_next_session = True # Add the current annotation to the current session if session_counter not in sessions: sessions[session_counter] = [] sessions[session_counter].append(current_annotation) if set_next_session: session_counter += 1 # Normalise all session timestamps by subtracting the smallest timestamp for that session from all timestamps in that session for _, annotation_timestamps in sessions.items(): min_timestamp = min(annotation.timestamp_datetime for annotation in annotation_timestamps) min_timestamp_int = min(annotation.timestamp for annotation in annotation_timestamps) for annotation in annotation_timestamps: annotation.normalised_timestamp = annotation.timestamp_datetime - min_timestamp annotation.timestamp -= min_timestamp_int # Order annotations in each session by timestamp for _, annotation_timestamps in sessions.items(): annotation_timestamps.sort(key=lambda x: x.timestamp) return sessions