################################################################################
# Copyright (c) 2025 José Joaquín Peralta Abadía. #
# Copyrights licensed under the MIT License. #
# See the accompanying LICENSE file for terms. #
# #
# Date: 30-01-2025 #
# Author(s): José Joaquín Peralta Abadía #
# E-mail: josejoaquin.peralta.abadia@gmail.com #
################################################################################
from typing import List, Optional, Union, Dict
import torch
from torch import Tensor
from avalanche.evaluation import Metric, GenericPluginMetric
from avalanche.evaluation.metrics.mean import Mean
from collections import defaultdict
[docs]class RMSE(Metric[float]):
"""RMSE metric. This is a standalone metric.
The update method computes the RMSE incrementally
by keeping a running average of the <prediction, target> pairs
of Tensors provided over time.
The "prediction" and "target" tensors may contain plain labels or
one-hot/logit vectors.
Each time `result` is called, this metric emits the average RMSE
across all predictions made since the last `reset`.
The reset method will bring the metric to its initial state. By default
this metric in its initial state will return an RMSE value of 0.
"""
[docs] def __init__(self):
"""Creates an instance of the standalone RMSE metric.
By default this metric in its initial state will return an RMSE
value of 0. The metric can be updated by using the `update` method
while the running RMSE can be retrieved using the `result` method.
"""
self._mean_RMSE = Mean()
"""The mean utility that will be used to store the running RMSE."""
@torch.no_grad()
def update(
self,
predicted_y: Tensor,
true_y: Tensor,
) -> None:
"""Update the running RMSE given the true and predicted labels.
:param predicted_y: The model prediction. Both labels and logit vectors
are supported.
:param true_y: The ground truth. Both labels and one-hot vectors
are supported.
:return: None.
"""
if len(true_y) != len(predicted_y):
raise ValueError("Size mismatch for true_y and predicted_y tensors")
rmse = float(torch.sqrt(torch.mean((true_y - predicted_y) ** 2)))
total_patterns = len(true_y)
self._mean_RMSE.update(rmse, total_patterns)
def result(self) -> float:
"""Retrieves the running RMSE.
Calling this method will not change the internal state of the metric.
:return: The current running RMSE, which is a float value
between 0 and 1.
"""
return self._mean_RMSE.result()
def reset(self) -> None:
"""Resets the metric.
:return: None.
"""
self._mean_RMSE.reset()
[docs]class TaskAwareRMSE(Metric[Dict[int, float]]):
"""The task-aware RMSE metric.
The metric computes a dictionary of <task_label, RMSE value> pairs.
update/result/reset methods are all task-aware.
"""
[docs] def __init__(self):
"""Creates an instance of the task-aware RMSE metric."""
self._mean_RMSE = defaultdict(RMSE)
"""
The mean utility that will be used to store the running RMSE
for each task label.
"""
@torch.no_grad()
def update(
self,
predicted_y: Tensor,
true_y: Tensor,
task_labels: Union[float, Tensor],
) -> None:
"""Update the running RMSE given the true and predicted labels.
Parameter `task_labels` is used to decide how to update the inner
dictionary: if Float, only the dictionary value related to that task
is updated. If Tensor, all the dictionary elements belonging to the
task labels will be updated.
:param predicted_y: The model prediction. Both labels and logit vectors
are supported.
:param true_y: The ground truth. Both labels and one-hot vectors
are supported.
:param task_labels: the int task label associated to the current
experience or the task labels vector showing the task label
for each pattern.
:return: None.
"""
if len(true_y) != len(predicted_y):
raise ValueError("Size mismatch for true_y and predicted_y tensors")
if isinstance(task_labels, Tensor) and len(task_labels) != len(true_y):
raise ValueError("Size mismatch for true_y and task_labels tensors")
if isinstance(task_labels, int):
self._mean_RMSE[task_labels].update(predicted_y, true_y)
elif isinstance(task_labels, Tensor):
for pred, true, t in zip(predicted_y, true_y, task_labels):
if isinstance(t, Tensor):
t = t.item()
self._mean_RMSE[t].update(pred.unsqueeze(0), true.unsqueeze(0))
else:
raise ValueError(
f"Task label type: {type(task_labels)}, "
f"expected int/float or Tensor"
)
def result(self, task_label: Optional[int] = None) -> Dict[int, float]:
"""
Retrieves the running RMSE.
Calling this method will not change the internal state of the metric.
task label is ignored if `self.split_by_task=False`.
:param task_label: if None, return the entire dictionary of RMSE
for each task. Otherwise return the dictionary
`{task_label: RMSE}`.
:return: A dict of running RMSE for each task label,
where each value is a float value between 0 and 1.
"""
assert task_label is None or isinstance(task_label, int)
if task_label is None:
return {k: v.result() for k, v in self._mean_RMSE.items()}
else:
return {task_label: self._mean_RMSE[task_label].result()}
def reset(self, task_label=None) -> None:
"""
Resets the metric.
task label is ignored if `self.split_by_task=False`.
:param task_label: if None, reset the entire dictionary.
Otherwise, reset the value associated to `task_label`.
:return: None.
"""
assert task_label is None or isinstance(task_label, int)
if task_label is None:
self._mean_RMSE = defaultdict(RMSE)
else:
self._mean_RMSE[task_label].reset()
class RMSEPluginMetric(GenericPluginMetric[float, RMSE]):
"""
Base class for all RMSE plugin metrics
"""
def __init__(self, reset_at, emit_at, mode, split_by_task=False):
"""Creates the RMSE plugin
:param reset_at:
:param emit_at:
:param mode:
:param split_by_task: whether to compute task-aware RMSE or not.
"""
super().__init__(RMSE(), reset_at=reset_at, emit_at=emit_at, mode=mode)
def reset(self) -> None:
self._metric.reset()
def result(self) -> float:
return self._metric.result()
def update(self, strategy):
self._metric.update(strategy.mb_output, strategy.mb_y)
[docs]class RMSEPerTaskPluginMetric(GenericPluginMetric[Dict[int, float], TaskAwareRMSE]):
"""
Base class for all RMSE plugin metrics
"""
[docs] def __init__(self, reset_at, emit_at, mode):
"""Creates the RMSE plugin
:param reset_at:
:param emit_at:
:param mode:
:param split_by_task: whether to compute task-aware RMSE or not.
"""
super().__init__(TaskAwareRMSE(), reset_at=reset_at, emit_at=emit_at, mode=mode)
def reset(self) -> None:
self._metric.reset()
def result(self) -> Dict[int, float]:
return self._metric.result()
def update(self, strategy):
self._metric.update(strategy.mb_output, strategy.mb_y, strategy.mb_task_id)
[docs]class MinibatchRMSE(RMSEPluginMetric):
"""
The minibatch plugin RMSE metric.
This metric only works at training time.
This metric computes the average RMSE over patterns
from a single minibatch.
It reports the result after each iteration.
If a more coarse-grained logging is needed, consider using
:class:`EpochRMSE` instead.
"""
[docs] def __init__(self):
"""
Creates an instance of the MinibatchRMSE metric.
"""
super(MinibatchRMSE, self).__init__(
reset_at="iteration", emit_at="iteration", mode="train"
)
def __str__(self):
return "Top1_RMSE_MB"
[docs]class EpochRMSE(RMSEPluginMetric):
"""
The average RMSE over a single training epoch.
This plugin metric only works at training time.
The RMSE will be logged after each training epoch by computing
the number of correctly predicted patterns during the epoch divided by
the overall number of patterns encountered in that epoch.
"""
[docs] def __init__(self):
"""
Creates an instance of the EpochRMSE metric.
"""
super(EpochRMSE, self).__init__(reset_at="epoch", emit_at="epoch", mode="train")
def __str__(self):
return "Top1_RMSE_Epoch"
[docs]class RunningEpochRMSE(RMSEPluginMetric):
"""
The average RMSE across all minibatches up to the current
epoch iteration.
This plugin metric only works at training time.
At each iteration, this metric logs the RMSE averaged over all patterns
seen so far in the current epoch.
The metric resets its state after each training epoch.
"""
[docs] def __init__(self):
"""
Creates an instance of the RunningEpochRMSE metric.
"""
super(RunningEpochRMSE, self).__init__(
reset_at="epoch", emit_at="iteration", mode="train"
)
def __str__(self):
return "Top1_RunningRMSE_Epoch"
[docs]class ExperienceRMSE(RMSEPluginMetric):
"""
At the end of each experience, this plugin metric reports
the average RMSE over all patterns seen in that experience.
This metric only works at eval time.
"""
[docs] def __init__(self):
"""
Creates an instance of ExperienceRMSE metric
"""
super(ExperienceRMSE, self).__init__(
reset_at="experience", emit_at="experience", mode="eval"
)
def __str__(self):
return "Top1_RMSE_Exp"
[docs]class StreamRMSE(RMSEPluginMetric):
"""
At the end of the entire stream of experiences, this plugin metric
reports the average RMSE over all patterns seen in all experiences.
This metric only works at eval time.
"""
[docs] def __init__(self):
"""
Creates an instance of StreamRMSE metric
"""
super(StreamRMSE, self).__init__(
reset_at="stream", emit_at="stream", mode="eval"
)
def __str__(self):
return "Top1_RMSE_Stream"
[docs]class TrainedExperienceRMSE(RMSEPluginMetric):
"""
At the end of each experience, this plugin metric reports the average
RMSE for only the experiences that the model has been trained on so far.
This metric only works at eval time.
"""
[docs] def __init__(self):
"""
Creates an instance of TrainedExperienceRMSE metric by first
constructing RMSEPluginMetric
"""
super(TrainedExperienceRMSE, self).__init__(
reset_at="stream", emit_at="stream", mode="eval"
)
self._current_experience = 0
def after_training_exp(self, strategy):
self._current_experience = strategy.experience.current_experience
# Reset average after learning from a new experience
self.reset()
return super().after_training_exp(strategy)
def update(self, strategy):
"""
Only update the RMSE with results from experiences that have been
trained on
"""
if strategy.experience.current_experience <= self._current_experience:
RMSEPluginMetric.update(self, strategy)
def __str__(self):
return "RMSE_On_Trained_Experiences"
[docs]def rmse_metrics(
*,
minibatch=False,
epoch=False,
epoch_running=False,
experience=False,
stream=False,
trained_experience=False,
) -> List[RMSEPluginMetric]:
"""
Helper method that can be used to obtain the desired set of
plugin metrics.
:param minibatch: If True, will return a metric able to log
the minibatch RMSE at training time.
:param epoch: If True, will return a metric able to log
the epoch RMSE at training time.
:param epoch_running: If True, will return a metric able to log
the running epoch RMSE at training time.
:param experience: If True, will return a metric able to log
the RMSE on each evaluation experience.
:param stream: If True, will return a metric able to log
the RMSE averaged over the entire evaluation stream of experiences.
:param trained_experience: If True, will return a metric able to log
the average evaluation RMSE only for experiences that the
model has been trained on
:return: A list of plugin metrics.
"""
metrics: List[RMSEPluginMetric] = []
if minibatch:
metrics.append(MinibatchRMSE())
if epoch:
metrics.append(EpochRMSE())
if epoch_running:
metrics.append(RunningEpochRMSE())
if experience:
metrics.append(ExperienceRMSE())
if stream:
metrics.append(StreamRMSE())
if trained_experience:
metrics.append(TrainedExperienceRMSE())
return metrics
__all__ = [
"RMSE",
"TaskAwareRMSE",
"MinibatchRMSE",
"EpochRMSE",
"RunningEpochRMSE",
"ExperienceRMSE",
"StreamRMSE",
"TrainedExperienceRMSE",
"rmse_metrics",
"RMSEPerTaskPluginMetric",
]