Source code for avalanche.benchmarks.scenarios.supervised

################################################################################
# Copyright (c) 2023 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 15-09-2023                                                             #
# Author(s): Antonio Carta                                                     #
# E-mail: contact@continualai.org                                              #
# Website: avalanche.continualai.org                                           #
################################################################################

"""High-level benchmark generators for supervised scenarios such as class-incremental."""
import warnings
from copy import copy
from typing import (
    Iterable,
    Sequence,
    Optional,
    Dict,
    List,
    Protocol,
)

import torch

from avalanche.benchmarks.utils.classification_dataset import (
    ClassificationDataset,
    _as_taskaware_supervised_classification_dataset,
)
from avalanche.benchmarks.utils.data import AvalancheDataset
from avalanche.benchmarks.utils.data_attribute import DataAttribute
from .dataset_scenario import _split_dataset_by_attribute, DatasetExperience
from .generic_scenario import CLScenario, CLStream, EagerCLStream


[docs]def class_incremental_benchmark( datasets_dict: Dict[str, ClassificationDataset], *, class_order: Optional[Sequence[int]] = None, num_experiences: Optional[int] = None, num_classes_per_exp: Optional[Sequence[int]] = None, seed: Optional[int] = None, ) -> CLScenario: """Splits datasets according to a class-incremental scenario. Each dataset will create a stream with the same class order. :param datasets_dict: A dictionary with stream names as keys (str) and AvalancheDataset as values. Usually, you want to provide at least train and test stream. :param class_order: List of classes that determine the order of appearance in the stream. If `None`, random classes will be used. Defaults to None (random classes). :param num_experiences: desired number of experiences in the stream. :param num_classes_per_exp: If not None, a list with the number of classes to pick for each experience. :param seed: The seed to use for random shuffling if `class_order is None`. If None, the current PyTorch random number generator state will be used. Defaults to None. :return: A class-incremental :class:`CLScenario`. """ if (class_order is not None) and (seed is not None): raise ValueError("Can't set `seed` if a fixed `class_order` is given.") if (num_classes_per_exp is not None) and (num_experiences is not None): raise ValueError( "Only one of `num_classes_per_exp` or `num_experiences` can be used." ) if (num_classes_per_exp is None) and (num_experiences is None): raise ValueError( "One of `num_classes_per_exp` or `num_experiences` must be set." ) if num_experiences is not None and num_experiences < 1: raise ValueError( "Invalid number of experiences (n_experiences " "parameter): must be greater than 0" ) # convert to avalanche datasets for name, dd in datasets_dict.items(): if not isinstance(dd, AvalancheDataset): datasets_dict[name] = _as_taskaware_supervised_classification_dataset(dd) # validate classes dd_classes = list(datasets_dict.values())[0].targets.uniques num_classes: int = 1 + max(list(datasets_dict.values())[0].targets.uniques) if (num_classes_per_exp is not None) and (num_classes != sum(num_classes_per_exp)): raise ValueError( "`sum(num_classes_per_exp)` must be equal to the total number of classes." ) for dd in datasets_dict.values(): # all datasets have the same classes clss = dd.targets.uniques if dd_classes != clss: raise ValueError("`datasets` must all have the same classes") # pick random class order if needed if class_order is None: # sample random class order if seed is not None: torch.random.manual_seed(seed) class_order = torch.randperm(num_classes).tolist() # split classes by experience classes_exp_assignment = [] if num_experiences is not None: assert num_classes_per_exp is None, "BUG: num_classes_per_exp must be None" curr_classess_per_exp: int = num_classes // num_experiences for eid in range(num_experiences): if eid == 0: classes_exp_assignment.append(class_order[:curr_classess_per_exp]) else: # final exp will take reminder of classes if they don't divide equally start_idx = curr_classess_per_exp * eid end_idx = start_idx + curr_classess_per_exp classes_exp_assignment.append(class_order[start_idx:end_idx]) elif num_classes_per_exp is not None: num_curr = 0 for eid, num_classes in enumerate(num_classes_per_exp): curr_classes = class_order[num_curr : num_curr + num_classes] classes_exp_assignment.append(curr_classes) num_curr += num_classes # create the streams using class_order to split the data streams = [] for name, dd in datasets_dict.items(): curr_stream = [] data_by_class = _split_dataset_by_attribute(dd, "targets") for eid, clss in enumerate(classes_exp_assignment): curr_data: ClassificationDataset = ClassificationDataset( [], data_attributes=[DataAttribute([], "targets")] ) for cls in clss: # TODO: curr_data.concat(data_by_class[cls]) is bugged and removes targets curr_data = data_by_class[cls].concat(curr_data) curr_stream.append(DatasetExperience(dataset=curr_data)) streams.append(EagerCLStream(name, curr_stream)) return with_classes_timeline(CLScenario(streams))
def _class_balanced_indices( data: ClassificationDataset, num_experiences: int, shuffle: bool = True, seed: Optional[int] = None, ) -> List[List[int]]: """class-balanced indices. Internal helper for `new_instances_benchmark`. :param data: the `AvalancheDataset` to split :param num_experiences: length of the stream :param shuffle: - :param seed: - """ if seed is not None: torch.random.manual_seed(seed) # Validate function arguments if num_experiences < 1: raise ValueError( "Invalid number of experiences (n_experiences " "parameter): must be greater than 0" ) # experience -> idxs assignment exps_idxs: List[List[int]] = [[] for _ in range(num_experiences)] # TODO: fix pycharm type hints for class_id, class_idxs in data.targets.val_to_idx.items(): # INVARIANT: class_idxs keeps only indices that are not assigned yet. # Whenever we add idxs to an experience, we remove them from class_idxs if shuffle: # shuffle each class perm = torch.randperm(len(class_idxs)) class_idxs = torch.as_tensor(class_idxs)[perm].tolist() # distribute equally each class to experiences npats = len(class_idxs) // num_experiences for eid in range(num_experiences): exps_idxs[eid].extend(class_idxs[:npats]) class_idxs = class_idxs[npats:] # distribute remainder if not divisible by num_experiences if len(class_idxs) > 0: if shuffle: exps_remaining: Iterable[int] = torch.randperm( num_experiences ).tolist()[: len(class_idxs)] else: exps_remaining = range(len(class_idxs)) for eid in exps_remaining: exps_idxs[eid].append(class_idxs[0]) class_idxs = class_idxs[1:] # sort to keep original order instead of class-order for eid in range(len(exps_idxs)): exps_idxs[eid].sort() return exps_idxs def _random_indices( data, num_experiences: int, shuffle: bool = True, seed: Optional[int] = None, min_class_patterns_in_exp: int = 0, ) -> List[List[int]]: """Random indices splitter. Internal helper for `new_instances_benchmark. :param min_class_patterns_in_exp: the random split must respect the constraint of having at least `min_min_class_patterns_in_exp` samples per class. :return: a list of indices for each experience. """ if seed is not None: torch.random.manual_seed(seed) ############################## # Validate function arguments ############################## if num_experiences < 1: raise ValueError( "Invalid number of experiences (n_experiences " "parameter): must be greater than 0" ) if min_class_patterns_in_exp < 0: raise ValueError( "Invalid min_class_patterns_in_exp parameter: " "must be greater than or equal to 0" ) ############################## # patterns -> experience assignment for train stream ############################## idxs_per_class = data.targets.val_to_idx # experience->idxs assignment exps_idxs: List[List[int]] = [[] for _ in range(num_experiences)] # validate `min_class_patterns_in_exp` argument min_class_patterns = min([len(el) for el in idxs_per_class.values()]) if min_class_patterns < num_experiences * min_class_patterns_in_exp: raise ValueError("min_class_patterns_in_exp constraint " "can't be satisfied") for class_id, class_idxs in idxs_per_class.items(): # INVARIANT: class_idxs keeps only indices that are not assigned yet. Whenever we add idxs to an experience, we remove them from class_idxs # first assign exactly min_class_patterns_in_exp. for eid in range(num_experiences): exps_idxs[eid].extend(class_idxs[:min_class_patterns_in_exp]) class_idxs = class_idxs[min_class_patterns_in_exp:] # distribute equally among experiences samples_per_exp = len(class_idxs) // num_experiences for eid in range(num_experiences): exps_idxs[eid].extend(class_idxs[:samples_per_exp]) class_idxs = class_idxs[samples_per_exp:] # distribute remaining patterns if len(class_idxs) > 0: if shuffle: exps_remaining: Iterable[int] = torch.randperm( num_experiences ).tolist()[: len(class_idxs)] else: exps_remaining = range(len(class_idxs)) for eid in exps_remaining: exps_idxs[eid].append(class_idxs[0]) class_idxs = class_idxs[1:] # sort to keep original order instead of class-order for ii in range(len(exps_idxs)): exps_idxs[ii].sort() return exps_idxs
[docs]def new_instances_benchmark( train_dataset: ClassificationDataset, test_dataset: AvalancheDataset, num_experiences: int, *, shuffle: bool = True, seed: Optional[int] = None, balance_experiences: bool = False, min_class_patterns_in_exp: int = 0, ) -> CLScenario: """Benchmark generator for "New Instances" (NI) scenarios. Given a `train_dataset` and a `test_dataset, the generator creates a benchmark where the training stream is split according to the New Instances setting. Notice that we don't split the test dataset in this generator because we have random splits, so it is more natural to test on the full test set at each step instead of an i.i.d. random test split. :param train_dataset: An AvalancheDataset used to define the training stream. :param test_dataset: A test AvalancheDataset. This will not be split. :param num_experiences: The desired stream length. :param shuffle: If True, patterns order will be shuffled. :param seed: A valid int used to initialize the random number generator. Can be None. :param balance_experiences: If True, pattern of each class will be equally spread across all experiences. If False, patterns will be assigned to experiences in a complete random way. Defaults to False. :param min_class_patterns_in_exp: The minimum amount of patterns of every class that must be assigned to every experience. Compatible with the ``balance_experiences`` parameter. An exception will be raised if this constraint can't be satisfied. Defaults to 0. :return: A properly initialized :class:`NIScenario` instance. """ if balance_experiences: # class-balanced split exps_idxs = _class_balanced_indices( data=train_dataset, num_experiences=num_experiences, shuffle=shuffle, seed=seed, ) else: exps_idxs = _random_indices( data=train_dataset, num_experiences=num_experiences, shuffle=shuffle, seed=seed, min_class_patterns_in_exp=min_class_patterns_in_exp, ) train_experiences = [] for idxs in exps_idxs: curr_data = train_dataset.subset(indices=idxs) train_experiences.append(DatasetExperience(dataset=curr_data)) train_stream = CLStream("train", train_experiences) test_stream = CLStream("test", [DatasetExperience(dataset=test_dataset)]) return CLScenario(streams=[train_stream, test_stream])
__all__ = [ "class_incremental_benchmark", "new_instances_benchmark", ] class ClassesTimeline(Protocol): """Experience decorator that provides info about classes occurrence over time.""" @property def classes_in_this_experience(self) -> List[int]: """The list of classes in this experience.""" ... @property def previous_classes(self) -> List[int]: """The list of classes in previous experiences.""" ... @property def classes_seen_so_far(self) -> List[int]: """List of classes of current and previous experiences.""" ... @property def future_classes(self) -> List[int]: """The list of classes of next experiences.""" ...
[docs]def with_classes_timeline(obj): """Add `ClassesTimeline` attributes. `obj` must be a scenario or a stream. """ def _decorate_benchmark(obj: CLScenario): new_streams = [] for s in obj.streams.values(): new_streams.append(_decorate_stream(s)) return CLScenario(new_streams) def _decorate_stream(obj: CLStream): # TODO: support stream generators. Should return a new generators which applies # foo_decorate_exp every time a new experience is generated. new_stream = [] if not isinstance(obj, EagerCLStream): warnings.warn("stream generator will be converted to a list.") # compute set of all classes in the stream all_cls: set[int] = set() for exp in obj: all_cls = all_cls.union(exp.dataset.targets.uniques) prev_cls: set[int] = set() for exp in obj: new_exp = copy(exp) curr_cls = exp.dataset.targets.uniques new_exp.classes_in_this_experience = curr_cls new_exp.previous_classes = set(prev_cls) new_exp.classes_seen_so_far = curr_cls.union(prev_cls) # TODO: future_classes ignores repetitions right now... # implement and test scenario with repetitions new_exp.future_classes = all_cls.difference(new_exp.classes_seen_so_far) new_stream.append(new_exp) prev_cls = prev_cls.union(curr_cls) return EagerCLStream(obj.name, new_stream) if isinstance(obj, CLScenario): return _decorate_benchmark(obj) elif isinstance(obj, CLStream): return _decorate_stream(obj) else: raise ValueError( "Unsupported object type: must be one of {CLScenario, CLStream}" )
__all__ = [ "class_incremental_benchmark", "new_instances_benchmark", "with_classes_timeline", ]