Source code for avalanche.evaluation.metrics.gpu_usage

################################################################################
# Copyright (c) 2021 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 19-01-2021                                                             #
# Author(s): Vincenzo Lomonaco, Lorenzo Pellegrini                             #
# E-mail: contact@continualai.org                                              #
# Website: www.continualai.org                                                 #
################################################################################


import GPUtil
from threading import Thread
import time
import warnings
from typing import Optional, TYPE_CHECKING, List

from avalanche.evaluation import Metric, PluginMetric, GenericPluginMetric
from avalanche.evaluation.metric_results import MetricResult

if TYPE_CHECKING:
    from avalanche.training.templates.supervised import SupervisedTemplate


[docs]class MaxGPU(Metric[float]): """ The standalone GPU usage metric. Important: this metric approximates the real maximum GPU percentage usage since it sample at discrete amount of time the GPU values. Instances of this metric keeps the maximum GPU usage percentage detected. The `start_thread` method starts the usage tracking. The `stop_thread` method stops the tracking. The result, obtained using the `result` method, is the usage in mega-bytes. The reset method will bring the metric to its initial state. By default this metric in its initial state will return an usage value of 0. """
[docs] def __init__(self, gpu_id, every=0.5): """ Creates an instance of the GPU usage metric. :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage """ self.every = every self.gpu_id = gpu_id n_gpus = len(GPUtil.getGPUs()) if n_gpus == 0: warnings.warn("Your system has no GPU!") self.gpu_id = None elif gpu_id < 0: warnings.warn( "GPU metric called with negative GPU id." "GPU logging disabled" ) self.gpu_id = None else: if gpu_id >= n_gpus: warnings.warn(f"GPU {gpu_id} not found. Using GPU 0.") self.gpu_id = 0 self.thread = None """ Thread executing GPU monitoring code """ self.stop_f = False """ Flag to stop the thread """ self.max_usage = 0 """ Main metric result. Max GPU usage. """
def _f(self): """ Until a stop signal is encountered, this function monitors each `every` seconds the maximum amount of GPU used by the process """ start_time = time.monotonic() while not self.stop_f: # GPU percentage gpu_perc = GPUtil.getGPUs()[self.gpu_id].load * 100 if gpu_perc > self.max_usage: self.max_usage = gpu_perc time.sleep( self.every - ((time.monotonic() - start_time) % self.every) ) def start_thread(self): if self.gpu_id is not None: assert not self.thread, ( "Trying to start thread " "without joining the previous." ) self.thread = Thread(target=self._f, daemon=True) self.thread.start() def stop_thread(self): if self.thread: self.stop_f = True self.thread.join() self.stop_f = False self.thread = None def reset(self) -> None: """ Resets the metric. :return: None. """ self.max_usage = 0 def result(self) -> Optional[float]: """ Returns the max GPU percentage value. :return: The percentage GPU usage as a float value in range [0, 1]. """ return self.max_usage def update(self): pass
class GPUPluginMetric(GenericPluginMetric[float]): def __init__(self, gpu_id, every, reset_at, emit_at, mode): self.gpu_id = gpu_id self._gpu = MaxGPU(gpu_id, every) super(GPUPluginMetric, self).__init__( self._gpu, reset_at=reset_at, emit_at=emit_at, mode=mode ) def update(self, strategy): self._gpu.update()
[docs]class MinibatchMaxGPU(GPUPluginMetric): """ The Minibatch Max GPU metric. This plugin metric only works at training time. """
[docs] def __init__(self, gpu_id, every=0.5): """ Creates an instance of the Minibatch Max GPU metric :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage """ super(MinibatchMaxGPU, self).__init__( gpu_id, every, reset_at="iteration", emit_at="iteration", mode="train", )
def before_training(self, strategy: "SupervisedTemplate") -> None: super().before_training(strategy) self._gpu.start_thread() def after_training(self, strategy: "SupervisedTemplate") -> None: super().before_training(strategy) self._gpu.stop_thread() def __str__(self): return f"MaxGPU{self.gpu_id}Usage_MB"
[docs]class EpochMaxGPU(GPUPluginMetric): """ The Epoch Max GPU metric. This plugin metric only works at training time. """
[docs] def __init__(self, gpu_id, every=0.5): """ Creates an instance of the epoch Max GPU metric. :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage """ super(EpochMaxGPU, self).__init__( gpu_id, every, reset_at="epoch", emit_at="epoch", mode="train" )
def before_training(self, strategy: "SupervisedTemplate"): super().before_training(strategy) self._gpu.start_thread() def after_training(self, strategy: "SupervisedTemplate") -> None: self._gpu.stop_thread() def __str__(self): return f"MaxGPU{self.gpu_id}Usage_Epoch"
[docs]class ExperienceMaxGPU(GPUPluginMetric): """ The Experience Max GPU metric. This plugin metric only works at eval time. """
[docs] def __init__(self, gpu_id, every=0.5): """ Creates an instance of the Experience CPU usage metric. :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage """ super(ExperienceMaxGPU, self).__init__( gpu_id, every, reset_at="experience", emit_at="experience", mode="eval", )
def before_eval(self, strategy: "SupervisedTemplate"): super().before_eval(strategy) self._gpu.start_thread() def after_eval(self, strategy: "SupervisedTemplate"): super().after_eval(strategy) self._gpu.stop_thread() def __str__(self): return f"MaxGPU{self.gpu_id}Usage_Experience"
[docs]class StreamMaxGPU(GPUPluginMetric): """ The Stream Max GPU metric. This plugin metric only works at eval time. """
[docs] def __init__(self, gpu_id, every=0.5): """ Creates an instance of the Experience CPU usage metric. :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage """ super(StreamMaxGPU, self).__init__( gpu_id, every, reset_at="stream", emit_at="stream", mode="eval" )
def before_eval(self, strategy): super().before_eval(strategy) self._gpu.start_thread() def after_eval(self, strategy: "SupervisedTemplate") -> MetricResult: packed = super().after_eval(strategy) self._gpu.stop_thread() return packed def __str__(self): return f"MaxGPU{self.gpu_id}Usage_Stream"
[docs]def gpu_usage_metrics( gpu_id, every=0.5, minibatch=False, epoch=False, experience=False, stream=False, ) -> List[PluginMetric]: """ Helper method that can be used to obtain the desired set of plugin metrics. :param gpu_id: GPU device ID. :param every: seconds after which update the maximum GPU usage :param minibatch: If True, will return a metric able to log the minibatch max GPU usage. :param epoch: If True, will return a metric able to log the epoch max GPU usage. :param experience: If True, will return a metric able to log the experience max GPU usage. :param stream: If True, will return a metric able to log the evaluation max stream GPU usage. :return: A list of plugin metrics. """ metrics = [] if minibatch: metrics.append(MinibatchMaxGPU(gpu_id, every)) if epoch: metrics.append(EpochMaxGPU(gpu_id, every)) if experience: metrics.append(ExperienceMaxGPU(gpu_id, every)) if stream: metrics.append(StreamMaxGPU(gpu_id, every)) return metrics
__all__ = [ "MaxGPU", "MinibatchMaxGPU", "EpochMaxGPU", "ExperienceMaxGPU", "StreamMaxGPU", "gpu_usage_metrics", ]