################################################################################
# Copyright (c) 2021 ContinualAI. #
# Copyrights licensed under the MIT License. #
# See the accompanying LICENSE file for terms. #
# #
# Date: 19-01-2021 #
# Author(s): Lorenzo Pellegrini #
# E-mail: contact@continualai.org #
# Website: www.continualai.org #
################################################################################
import os
import warnings
from typing import Optional, List
from psutil import Process
from avalanche.evaluation import Metric, PluginMetric, GenericPluginMetric
from avalanche.evaluation.metrics import Mean
[docs]class CPUUsage(Metric[float]):
"""
The standalone CPU usage metric.
Instances of this metric compute the average CPU usage as a float value.
The metric starts tracking the CPU usage when the `update` method is called
for the first time. That is, the tracking does not start at the time the
constructor is invoked.
Calling the `update` method more than twice will update the metric to the
average usage between the first and the last call to `update`.
The result, obtained using the `result` method, is the usage computed
as stated above.
The reset method will bring the metric to its initial state. By default
this metric in its initial state will return an usage value of 0.
"""
[docs] def __init__(self):
"""
Creates an instance of the standalone CPU usage metric.
By default this metric in its initial state will return a CPU usage
value of 0. The metric can be updated by using the `update` method
while the average CPU usage can be retrieved using the `result` method.
"""
self._mean_usage = Mean()
"""
The mean utility that will be used to store the average usage.
"""
self._process_handle: Optional[Process] = None
"""
The process handle, lazily initialized.
"""
self._first_update = True
"""
An internal flag to keep track of the first call to the `update` method.
"""
def update(self) -> None:
"""
Update the running CPU usage.
For more info on how to set the starting moment see the class
description.
:return: None.
"""
if self._first_update:
self._process_handle = Process(os.getpid())
assert self._process_handle is not None
last_time = getattr(self._process_handle, "_last_sys_cpu_times", None)
utilization = self._process_handle.cpu_percent()
current_time = getattr(self._process_handle, "_last_sys_cpu_times", None)
if self._first_update:
self._first_update = False
else:
if current_time is None or last_time is None:
warnings.warn(
"CPUUsage can't detect the elapsed time. It is "
"recommended to update avalanche to the latest "
"version."
)
# Fallback, shouldn't happen
current_time = 1.0
last_time = 0.0
self._mean_usage.update(utilization, current_time - last_time)
def result(self) -> float:
"""
Retrieves the average CPU usage.
Calling this method will not change the internal state of the metric.
:return: The average CPU usage, as a float value.
"""
return self._mean_usage.result()
def reset(self) -> None:
"""
Resets the metric.
:return: None.
"""
self._mean_usage.reset()
self._process_handle = None
self._first_update = True
class CPUPluginMetric(GenericPluginMetric[float, CPUUsage]):
def __init__(self, reset_at, emit_at, mode):
super(CPUPluginMetric, self).__init__(
CPUUsage(), reset_at=reset_at, emit_at=emit_at, mode=mode
)
def update(self, strategy):
self._metric.update()
[docs]class MinibatchCPUUsage(CPUPluginMetric):
"""
The minibatch CPU usage metric.
This plugin metric only works at training time.
This metric "logs" the CPU usage for each iteration.
If a more coarse-grained logging is needed, consider using
:class:`EpochCPUUsage`.
"""
[docs] def __init__(self):
"""
Creates an instance of the minibatch CPU usage metric.
"""
super(MinibatchCPUUsage, self).__init__(
reset_at="iteration", emit_at="iteration", mode="train"
)
def before_training_iteration(self, strategy):
super().before_training_iteration(strategy)
self.update(strategy) # start monitoring thread
def __str__(self):
return "CPUUsage_MB"
[docs]class EpochCPUUsage(CPUPluginMetric):
"""
The Epoch CPU usage metric.
This plugin metric only works at training time.
The average usage will be logged after each epoch.
"""
[docs] def __init__(self):
"""
Creates an instance of the epoch CPU usage metric.
"""
super(EpochCPUUsage, self).__init__(
reset_at="epoch", emit_at="epoch", mode="train"
)
def before_training_epoch(self, strategy):
super().before_training_epoch(strategy)
self.update(strategy) # start monitoring thread
def __str__(self):
return "CPUUsage_Epoch"
[docs]class RunningEpochCPUUsage(CPUPluginMetric):
"""
The running epoch CPU usage metric.
This plugin metric only works at training time
After each iteration, the metric logs the average CPU usage up
to the current epoch iteration.
"""
[docs] def __init__(self):
"""
Creates an instance of the average epoch cpu usage metric.
"""
self._mean = Mean()
super(RunningEpochCPUUsage, self).__init__(
reset_at="epoch", emit_at="iteration", mode="train"
)
def result(self) -> float:
return self._mean.result()
def before_training_epoch(self, strategy):
super().before_training_epoch(strategy)
self._mean.reset()
def before_training_iteration(self, strategy):
super().before_training_iteration(strategy)
self.update(strategy) # start monitoring thread
def after_training_iteration(self, strategy):
super().after_training_iteration(strategy)
self.update(strategy)
self._mean.update(self._metric.result())
self._metric.reset()
return self._package_result(strategy)
def __str__(self):
return "RunningCPUUsage_Epoch"
[docs]class ExperienceCPUUsage(CPUPluginMetric):
"""
The average experience CPU usage metric.
This plugin metric works only at eval time.
After each experience, this metric emits the average CPU usage on that
experience.
"""
[docs] def __init__(self):
"""
Creates an instance of the experience CPU usage metric.
"""
super(ExperienceCPUUsage, self).__init__(
reset_at="experience", emit_at="experience", mode="eval"
)
def before_eval_exp(self, strategy):
super().before_eval_exp(strategy)
self.update(strategy) # start monitoring thread
def __str__(self):
return "CPUUsage_Exp"
[docs]class StreamCPUUsage(CPUPluginMetric):
"""
The average stream CPU usage metric.
This plugin metric works only at eval time.
After the entire evaluation stream, this metric emits
the average CPU usage on all experiences.
"""
[docs] def __init__(self):
"""
Creates an instance of the stream CPU usage metric.
"""
super(StreamCPUUsage, self).__init__(
reset_at="stream", emit_at="stream", mode="eval"
)
def before_eval(self, strategy):
super().before_eval(strategy)
self.update(strategy) # start monitoring thread
def __str__(self):
return "CPUUsage_Stream"
[docs]def cpu_usage_metrics(
*, minibatch=False, epoch=False, epoch_running=False, experience=False, stream=False
) -> List[CPUPluginMetric]:
"""
Helper method that can be used to obtain the desired set of
plugin metrics.
:param minibatch: If True, will return a metric able to log the minibatch
CPU usage
:param epoch: If True, will return a metric able to log the epoch
CPU usage
:param epoch_running: If True, will return a metric able to log the running
epoch CPU usage.
:param experience: If True, will return a metric able to log the experience
CPU usage.
:param stream: If True, will return a metric able to log the evaluation
stream CPU usage.
:return: A list of plugin metrics.
"""
metrics: List[CPUPluginMetric] = []
if minibatch:
metrics.append(MinibatchCPUUsage())
if epoch:
metrics.append(EpochCPUUsage())
if epoch_running:
metrics.append(RunningEpochCPUUsage())
if experience:
metrics.append(ExperienceCPUUsage())
if stream:
metrics.append(StreamCPUUsage())
return metrics
__all__ = [
"CPUUsage",
"MinibatchCPUUsage",
"EpochCPUUsage",
"RunningEpochCPUUsage",
"ExperienceCPUUsage",
"StreamCPUUsage",
"cpu_usage_metrics",
]