Source code for avalanche.evaluation.metrics.r2

################################################################################
# Copyright (c) 2025 José Joaquín Peralta Abadía.                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 30-01-2025                                                             #
# Author(s): José Joaquín Peralta Abadía                                       #
# E-mail: josejoaquin.peralta.abadia@gmail.com                                 #
################################################################################

from typing import List, Optional, Union, Dict

import torch
from torch import Tensor
from avalanche.evaluation import Metric, GenericPluginMetric
from avalanche.evaluation.metrics.mean import Mean
from collections import defaultdict

from torchmetrics.functional import r2_score


[docs]class R2(Metric[float]): """R2 metric. This is a standalone metric. The update method computes the R2 incrementally by keeping a running average of the <prediction, target> pairs of Tensors provided over time. The "prediction" and "target" tensors may contain plain labels or one-hot/logit vectors. Each time `result` is called, this metric emits the average R2 across all predictions made since the last `reset`. The reset method will bring the metric to its initial state. By default this metric in its initial state will return an R2 value of 0. """
[docs] def __init__(self): """Creates an instance of the standalone R2 metric. By default this metric in its initial state will return an R2 value of 0. The metric can be updated by using the `update` method while the running R2 can be retrieved using the `result` method. """ self._mean_R2 = Mean() """The mean utility that will be used to store the running R2."""
@torch.no_grad() def update( self, predicted_y: Tensor, true_y: Tensor, ) -> None: """Update the running R2 given the true and predicted labels. :param predicted_y: The model prediction. Both labels and logit vectors are supported. :param true_y: The ground truth. Both labels and one-hot vectors are supported. :return: None. """ if len(true_y) != len(predicted_y): raise ValueError("Size mismatch for true_y and predicted_y tensors") rmse = float(r2_score(true_y, predicted_y)) total_patterns = len(true_y) self._mean_R2.update(rmse, total_patterns) def result(self) -> float: """Retrieves the running R2. Calling this method will not change the internal state of the metric. :return: The current running R2, which is a float value between 0 and 1. """ return self._mean_R2.result() def reset(self) -> None: """Resets the metric. :return: None. """ self._mean_R2.reset()
[docs]class TaskAwareR2(Metric[Dict[int, float]]): """The task-aware R2 metric. The metric computes a dictionary of <task_label, R2 value> pairs. update/result/reset methods are all task-aware. """
[docs] def __init__(self): """Creates an instance of the task-aware R2 metric.""" self._mean_R2 = defaultdict(R2) """ The mean utility that will be used to store the running R2 for each task label. """
@torch.no_grad() def update( self, predicted_y: Tensor, true_y: Tensor, task_labels: Union[float, Tensor], ) -> None: """Update the running R2 given the true and predicted labels. Parameter `task_labels` is used to decide how to update the inner dictionary: if Float, only the dictionary value related to that task is updated. If Tensor, all the dictionary elements belonging to the task labels will be updated. :param predicted_y: The model prediction. Both labels and logit vectors are supported. :param true_y: The ground truth. Both labels and one-hot vectors are supported. :param task_labels: the int task label associated to the current experience or the task labels vector showing the task label for each pattern. :return: None. """ if len(true_y) != len(predicted_y): raise ValueError("Size mismatch for true_y and predicted_y tensors") if isinstance(task_labels, Tensor) and len(task_labels) != len(true_y): raise ValueError("Size mismatch for true_y and task_labels tensors") if isinstance(task_labels, int): self._mean_R2[task_labels].update(predicted_y, true_y) elif isinstance(task_labels, Tensor): for pred, true, t in zip(predicted_y, true_y, task_labels): if isinstance(t, Tensor): t = t.item() self._mean_R2[t].update(pred.unsqueeze(0), true.unsqueeze(0)) else: raise ValueError( f"Task label type: {type(task_labels)}, " f"expected int/float or Tensor" ) def result(self, task_label: Optional[int] = None) -> Dict[int, float]: """ Retrieves the running R2. Calling this method will not change the internal state of the metric. task label is ignored if `self.split_by_task=False`. :param task_label: if None, return the entire dictionary of R2 for each task. Otherwise return the dictionary `{task_label: R2}`. :return: A dict of running R2 for each task label, where each value is a float value between 0 and 1. """ assert task_label is None or isinstance(task_label, int) if task_label is None: return {k: v.result() for k, v in self._mean_R2.items()} else: return {task_label: self._mean_R2[task_label].result()} def reset(self, task_label=None) -> None: """ Resets the metric. task label is ignored if `self.split_by_task=False`. :param task_label: if None, reset the entire dictionary. Otherwise, reset the value associated to `task_label`. :return: None. """ assert task_label is None or isinstance(task_label, int) if task_label is None: self._mean_R2 = defaultdict(R2) else: self._mean_R2[task_label].reset()
class R2PluginMetric(GenericPluginMetric[float, R2]): """ Base class for all R2 plugin metrics """ def __init__(self, reset_at, emit_at, mode, split_by_task=False): """Creates the R2 plugin :param reset_at: :param emit_at: :param mode: :param split_by_task: whether to compute task-aware R2 or not. """ super().__init__(R2(), reset_at=reset_at, emit_at=emit_at, mode=mode) def reset(self) -> None: self._metric.reset() def result(self) -> float: return self._metric.result() def update(self, strategy): self._metric.update(strategy.mb_output, strategy.mb_y)
[docs]class R2PerTaskPluginMetric(GenericPluginMetric[Dict[int, float], TaskAwareR2]): """ Base class for all R2 plugin metrics """
[docs] def __init__(self, reset_at, emit_at, mode): """Creates the R2 plugin :param reset_at: :param emit_at: :param mode: :param split_by_task: whether to compute task-aware R2 or not. """ super().__init__(TaskAwareR2(), reset_at=reset_at, emit_at=emit_at, mode=mode)
def reset(self) -> None: self._metric.reset() def result(self) -> Dict[int, float]: return self._metric.result() def update(self, strategy): self._metric.update(strategy.mb_output, strategy.mb_y, strategy.mb_task_id)
[docs]class MinibatchR2(R2PluginMetric): """ The minibatch plugin R2 metric. This metric only works at training time. This metric computes the average R2 over patterns from a single minibatch. It reports the result after each iteration. If a more coarse-grained logging is needed, consider using :class:`EpochR2` instead. """
[docs] def __init__(self): """ Creates an instance of the MinibatchR2 metric. """ super(MinibatchR2, self).__init__( reset_at="iteration", emit_at="iteration", mode="train" )
def __str__(self): return "Top1_R2_MB"
[docs]class EpochR2(R2PluginMetric): """ The average R2 over a single training epoch. This plugin metric only works at training time. The R2 will be logged after each training epoch by computing the number of correctly predicted patterns during the epoch divided by the overall number of patterns encountered in that epoch. """
[docs] def __init__(self): """ Creates an instance of the EpochR2 metric. """ super(EpochR2, self).__init__(reset_at="epoch", emit_at="epoch", mode="train")
def __str__(self): return "Top1_R2_Epoch"
[docs]class RunningEpochR2(R2PluginMetric): """ The average R2 across all minibatches up to the current epoch iteration. This plugin metric only works at training time. At each iteration, this metric logs the R2 averaged over all patterns seen so far in the current epoch. The metric resets its state after each training epoch. """
[docs] def __init__(self): """ Creates an instance of the RunningEpochR2 metric. """ super(RunningEpochR2, self).__init__( reset_at="epoch", emit_at="iteration", mode="train" )
def __str__(self): return "Top1_RunningR2_Epoch"
[docs]class ExperienceR2(R2PluginMetric): """ At the end of each experience, this plugin metric reports the average R2 over all patterns seen in that experience. This metric only works at eval time. """
[docs] def __init__(self): """ Creates an instance of ExperienceR2 metric """ super(ExperienceR2, self).__init__( reset_at="experience", emit_at="experience", mode="eval" )
def __str__(self): return "Top1_R2_Exp"
[docs]class StreamR2(R2PluginMetric): """ At the end of the entire stream of experiences, this plugin metric reports the average R2 over all patterns seen in all experiences. This metric only works at eval time. """
[docs] def __init__(self): """ Creates an instance of StreamR2 metric """ super(StreamR2, self).__init__(reset_at="stream", emit_at="stream", mode="eval")
def __str__(self): return "Top1_R2_Stream"
[docs]class TrainedExperienceR2(R2PluginMetric): """ At the end of each experience, this plugin metric reports the average R2 for only the experiences that the model has been trained on so far. This metric only works at eval time. """
[docs] def __init__(self): """ Creates an instance of TrainedExperienceR2 metric by first constructing R2PluginMetric """ super(TrainedExperienceR2, self).__init__( reset_at="stream", emit_at="stream", mode="eval" ) self._current_experience = 0
def after_training_exp(self, strategy): self._current_experience = strategy.experience.current_experience # Reset average after learning from a new experience self.reset() return super().after_training_exp(strategy) def update(self, strategy): """ Only update the R2 with results from experiences that have been trained on """ if strategy.experience.current_experience <= self._current_experience: R2PluginMetric.update(self, strategy) def __str__(self): return "R2_On_Trained_Experiences"
[docs]def r2_metrics( *, minibatch=False, epoch=False, epoch_running=False, experience=False, stream=False, trained_experience=False, ) -> List[R2PluginMetric]: """ Helper method that can be used to obtain the desired set of plugin metrics. :param minibatch: If True, will return a metric able to log the minibatch R2 at training time. :param epoch: If True, will return a metric able to log the epoch R2 at training time. :param epoch_running: If True, will return a metric able to log the running epoch R2 at training time. :param experience: If True, will return a metric able to log the R2 on each evaluation experience. :param stream: If True, will return a metric able to log the R2 averaged over the entire evaluation stream of experiences. :param trained_experience: If True, will return a metric able to log the average evaluation R2 only for experiences that the model has been trained on :return: A list of plugin metrics. """ metrics: List[R2PluginMetric] = [] if minibatch: metrics.append(MinibatchR2()) if epoch: metrics.append(EpochR2()) if epoch_running: metrics.append(RunningEpochR2()) if experience: metrics.append(ExperienceR2()) if stream: metrics.append(StreamR2()) if trained_experience: metrics.append(TrainedExperienceR2()) return metrics
__all__ = [ "R2", "TaskAwareR2", "MinibatchR2", "EpochR2", "RunningEpochR2", "ExperienceR2", "StreamR2", "TrainedExperienceR2", "r2_metrics", "R2PerTaskPluginMetric", ]