Source code for avalanche.training.templates.supervised

from typing import Sequence, Optional
from pkg_resources import parse_version

import torch
from torch.nn import Module, CrossEntropyLoss
from torch.optim import Optimizer
from torch.utils.data import DataLoader

from avalanche.benchmarks.utils.data_loader import TaskBalancedDataLoader
from avalanche.models import avalanche_forward
from avalanche.models.dynamic_optimizers import reset_optimizer
from avalanche.models.utils import avalanche_model_adaptation
from avalanche.training.plugins import SupervisedPlugin
from avalanche.training.plugins.evaluation import default_evaluator
from avalanche.training.templates.base_sgd import BaseSGDTemplate
from avalanche.training.utils import trigger_plugins


[docs]class SupervisedTemplate(BaseSGDTemplate): """Base class for continual learning strategies. BaseTemplate is the super class of all task-based continual learning strategies. It implements a basic training loop and callback system that allows to execute code at each experience of the training loop. Plugins can be used to implement callbacks to augment the training loop with additional behavior (e.g. a memory buffer for replay). **Scenarios** This strategy supports several continual learning scenarios: * class-incremental scenarios (no task labels) * multi-task scenarios, where task labels are provided) * multi-incremental scenarios, where the same task may be revisited The exact scenario depends on the data stream and whether it provides the task labels. **Training loop** The training loop is organized as follows:: train train_exp # for each experience adapt_train_dataset train_dataset_adaptation make_train_dataloader train_epoch # for each epoch # forward # backward # model update **Evaluation loop** The evaluation loop is organized as follows:: eval eval_exp # for each experience adapt_eval_dataset eval_dataset_adaptation make_eval_dataloader eval_epoch # for each epoch # forward # backward # model update """ PLUGIN_CLASS = SupervisedPlugin
[docs] def __init__( self, model: Module, optimizer: Optimizer, criterion=CrossEntropyLoss(), train_mb_size: int = 1, train_epochs: int = 1, eval_mb_size: Optional[int] = 1, device="cpu", plugins: Optional[Sequence["SupervisedPlugin"]] = None, evaluator=default_evaluator, eval_every=-1, peval_mode="epoch", ): """Init. :param model: PyTorch model. :param optimizer: PyTorch optimizer. :param criterion: loss function. :param train_mb_size: mini-batch size for training. :param train_epochs: number of training epochs. :param eval_mb_size: mini-batch size for eval. :param device: PyTorch device where the model will be allocated. :param plugins: (optional) list of StrategyPlugins. :param evaluator: (optional) instance of EvaluationPlugin for logging and metric computations. None to remove logging. :param eval_every: the frequency of the calls to `eval` inside the training loop. -1 disables the evaluation. 0 means `eval` is called only at the end of the learning experience. Values >0 mean that `eval` is called every `eval_every` epochs and at the end of the learning experience. :param peval_mode: one of {'epoch', 'iteration'}. Decides whether the periodic evaluation during training should execute every `eval_every` epochs or iterations (Default='epoch'). """ super().__init__( model=model, optimizer=optimizer, train_mb_size=train_mb_size, train_epochs=train_epochs, eval_mb_size=eval_mb_size, device=device, plugins=plugins, evaluator=evaluator, eval_every=eval_every, peval_mode=peval_mode, ) self._criterion = criterion ################################################################### # State variables. These are updated during the train/eval loops. # ################################################################### self.adapted_dataset = None """ Data used to train. It may be modified by plugins. Plugins can append data to it (e.g. for replay). .. note:: This dataset may contain samples from different experiences. If you want the original data for the current experience use :attr:`.BaseTemplate.experience`. """
@property def mb_x(self): """Current mini-batch input.""" return self.mbatch[0] @property def mb_y(self): """Current mini-batch target.""" return self.mbatch[1] @property def mb_task_id(self): """Current mini-batch task labels.""" assert len(self.mbatch) >= 3 return self.mbatch[-1] def criterion(self): """Loss function.""" return self._criterion(self.mb_output, self.mb_y) def _before_training_exp(self, **kwargs): """Setup to train on a single experience.""" # Data Adaptation (e.g. add new samples/data augmentation) self._before_train_dataset_adaptation(**kwargs) self.train_dataset_adaptation(**kwargs) self._after_train_dataset_adaptation(**kwargs) super()._before_training_exp(**kwargs) def _load_train_state(self, prev_state): super()._load_train_state(prev_state) self.adapted_dataset = prev_state["adapted_dataset"] self.dataloader = prev_state["dataloader"] def _save_train_state(self): """Save the training state which may be modified by the eval loop. This currently includes: experience, adapted_dataset, dataloader, is_training, and train/eval modes for each module. TODO: we probably need a better way to do this. """ state = super()._save_train_state() new_state = { "adapted_dataset": self.adapted_dataset, "dataloader": self.dataloader, } return {**state, **new_state} def train_dataset_adaptation(self, **kwargs): """Initialize `self.adapted_dataset`.""" self.adapted_dataset = self.experience.dataset self.adapted_dataset = self.adapted_dataset.train() def _before_eval_exp(self, **kwargs): # Data Adaptation self._before_eval_dataset_adaptation(**kwargs) self.eval_dataset_adaptation(**kwargs) self._after_eval_dataset_adaptation(**kwargs) super()._before_eval_exp(**kwargs) def make_train_dataloader( self, num_workers=0, shuffle=True, pin_memory=True, persistent_workers=False, **kwargs ): """Data loader initialization. Called at the start of each learning experience after the dataset adaptation. :param num_workers: number of thread workers for the data loading. :param shuffle: True if the data should be shuffled, False otherwise. :param pin_memory: If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Defaults to True. """ other_dataloader_args = {} if parse_version(torch.__version__) >= parse_version("1.7.0"): other_dataloader_args["persistent_workers"] = persistent_workers self.dataloader = TaskBalancedDataLoader( self.adapted_dataset, oversample_small_groups=True, num_workers=num_workers, batch_size=self.train_mb_size, shuffle=shuffle, pin_memory=pin_memory, **other_dataloader_args ) def make_eval_dataloader( self, num_workers=0, pin_memory=True, persistent_workers=False, **kwargs ): """ Initializes the eval data loader. :param num_workers: How many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0). :param pin_memory: If True, the data loader will copy Tensors into CUDA pinned memory before returning them. Defaults to True. :param kwargs: :return: """ other_dataloader_args = {} if parse_version(torch.__version__) >= parse_version("1.7.0"): other_dataloader_args["persistent_workers"] = persistent_workers self.dataloader = DataLoader( self.adapted_dataset, num_workers=num_workers, batch_size=self.eval_mb_size, pin_memory=pin_memory, **other_dataloader_args ) def forward(self): """Compute the model's output given the current mini-batch.""" return avalanche_forward(self.model, self.mb_x, self.mb_task_id) def model_adaptation(self, model=None): """Adapts the model to the current data. Calls the :class:`~avalanche.models.DynamicModule`s adaptation. """ if model is None: model = self.model avalanche_model_adaptation(model, self.experience) return model.to(self.device) def _unpack_minibatch(self): """We assume mini-batches have the form <x, y, ..., t>. This allows for arbitrary tensors between y and t. Keep in mind that in the most general case mb_task_id is a tensor which may contain different labels for each sample. """ assert len(self.mbatch) >= 3 super()._unpack_minibatch() def eval_dataset_adaptation(self, **kwargs): """Initialize `self.adapted_dataset`.""" self.adapted_dataset = self.experience.dataset self.adapted_dataset = self.adapted_dataset.eval() def make_optimizer(self): """Optimizer initialization. Called before each training experiene to configure the optimizer. """ # we reset the optimizer's state after each experience. # This allows to add new parameters (new heads) and # freezing old units during the model's adaptation phase. reset_optimizer(self.optimizer, self.model) ######################################################### # Plugin Triggers # ######################################################### def _before_train_dataset_adaptation(self, **kwargs): trigger_plugins(self, "before_train_dataset_adaptation", **kwargs) def _after_train_dataset_adaptation(self, **kwargs): trigger_plugins(self, "after_train_dataset_adaptation", **kwargs) def _before_eval_dataset_adaptation(self, **kwargs): trigger_plugins(self, "before_eval_dataset_adaptation", **kwargs) def _after_eval_dataset_adaptation(self, **kwargs): trigger_plugins(self, "after_eval_dataset_adaptation", **kwargs)