Source code for avalanche.benchmarks.scenarios.online_scenario

################################################################################
# Copyright (c) 2022 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 11-04-2022                                                             #
# Author(s): Antonio Carta                                                     #
# E-mail: contact@continualai.org                                              #
# Website: avalanche.continualai.org                                           #
################################################################################
from copy import copy
from typing import Callable, Iterable, List, Union

import torch

from avalanche.benchmarks.scenarios.generic_scenario import (
    CLExperience,
    EagerCLStream,
    CLStream,
    ExperienceAttribute,
    CLScenario,
)
from avalanche.benchmarks.utils import AvalancheSubset


[docs]class OnlineCLExperience(CLExperience): """Online CL (OCL) Experience. OCL experiences are created by splitting a larger experience. Therefore, they keep track of the original experience for logging purposes. """
[docs] def __init__( self, current_experience: int = None, origin_stream=None, origin_experience=None, subexp_size: int = 1, is_first_subexp: bool = False, is_last_subexp: bool = False, sub_stream_length: int = None, access_task_boundaries: bool = False ): """Init. :param current_experience: experience identifier. :param origin_stream: origin stream. :param origin_experience: origin experience used to create self. :param is_first_subexp: whether self is the first in the sub-experiences stream. :param sub_stream_length: the sub-stream length. """ super().__init__(current_experience, origin_stream) self.access_task_boundaries = access_task_boundaries self.origin_experience = ExperienceAttribute( origin_experience, use_in_train=access_task_boundaries) self.subexp_size = ExperienceAttribute( subexp_size, use_in_train=access_task_boundaries) self.is_first_subexp = ExperienceAttribute( is_first_subexp, use_in_train=access_task_boundaries) self.is_last_subexp = ExperienceAttribute( is_last_subexp, use_in_train=access_task_boundaries) self.sub_stream_length = ExperienceAttribute( sub_stream_length, use_in_train=access_task_boundaries)
def fixed_size_experience_split( experience: CLExperience, experience_size: int, shuffle: bool = True, drop_last: bool = False, access_task_boundaries: bool = False ): """Returns a lazy stream generated by splitting an experience into smaller ones. Splits the experience in smaller experiences of size `experience_size`. :param experience: The experience to split. :param experience_size: The experience size (number of instances). :param shuffle: If True, instances will be shuffled before splitting. :param drop_last: If True, the last mini-experience will be dropped if not of size `experience_size` :return: The list of datasets that will be used to create the mini-experiences. """ def gen(): exp_dataset = experience.dataset exp_indices = list(range(len(exp_dataset))) if shuffle: exp_indices = torch.as_tensor(exp_indices)[ torch.randperm(len(exp_indices)) ].tolist() sub_stream_length = len(exp_indices) // experience_size if not drop_last and len(exp_indices) % experience_size > 0: sub_stream_length += 1 init_idx = 0 is_first = True is_last = False while init_idx < len(exp_indices): final_idx = init_idx + experience_size # Exclusive if final_idx > len(exp_indices): if drop_last: break final_idx = len(exp_indices) is_last = True exp = OnlineCLExperience( origin_experience=experience, subexp_size=experience_size, is_first_subexp=is_first, is_last_subexp=is_last, sub_stream_length=sub_stream_length, access_task_boundaries=access_task_boundaries ) exp.dataset = AvalancheSubset( exp_dataset, indices=exp_indices[init_idx:final_idx] ) is_first = False yield exp init_idx = final_idx return gen() def split_online_stream( original_stream: EagerCLStream, experience_size: int, shuffle: bool = False, drop_last: bool = False, experience_split_strategy: Callable[ [CLExperience], Iterable[CLExperience] ] = None, access_task_boundaries: bool = False, ): """Split a stream of large batches to create an online stream of small mini-batches. The resulting stream can be used for Online Continual Learning (OCL) scenarios (or data-incremental, or other online-based settings). For efficiency reasons, the resulting stream is an iterator, generating experience on-demand. :param original_stream: The stream with the original data. :param experience_size: The size of the experience, as an int. Ignored if `custom_split_strategy` is used. :param shuffle: If True, experiences will be split by first shuffling instances in each experience. This will use the default PyTorch random number generator at its current state. Defaults to False. Ignored if `experience_split_strategy` is used. :param drop_last: If True, if the last experience doesn't contain `experience_size` instances, then the last experience will be dropped. Defaults to False. Ignored if `experience_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split_strategy`. :return: A lazy online stream with experiences of size `experience_size`. """ if experience_split_strategy is None: def split_foo(exp: CLExperience, size: int): return fixed_size_experience_split( exp, size, shuffle, drop_last, access_task_boundaries=access_task_boundaries) def exps_iter(): for exp in original_stream: for sub_exp in split_foo(exp, experience_size): yield sub_exp stream_name = original_stream.name if hasattr(original_stream, "name") \ else "train" return CLStream( name=stream_name, exps_iter=exps_iter(), set_stream_info=True )
[docs]class OnlineCLScenario(CLScenario):
[docs] def __init__( self, original_streams: List[EagerCLStream], experiences: Union[CLExperience, Iterable[CLExperience]] = None, experience_size: int = 10, stream_split_strategy="fixed_size_split", access_task_boundaries: bool = False ): """Creates an online scenario from an existing CL scenario :param original_streams: The streams from the original CL scenario. :param experiences: If None, the online stream will be created from the `train_stream` of the original CL scenario, otherwise it will create an online stream from the given sequence of experiences. :param experience_size: The size of each online experiences, as an int. Ignored if `custom_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split_strategy`. : param access_task_boundaries: If True the attributes related to task boundaries such as `is_first_subexp` and `is_last_subexp` become accessible during training. """ if stream_split_strategy == "fixed_size_split": def split_foo(s): return split_online_stream( s, experience_size, access_task_boundaries=access_task_boundaries) else: raise ValueError("Unknown experience split strategy") streams_dict = {s.name: s for s in original_streams} if "train" not in streams_dict: raise ValueError("Missing train stream for `original_streams`.") if experiences is None: online_train_stream = split_foo(streams_dict["train"]) else: if not isinstance(experiences, Iterable): experiences = [experiences] online_train_stream = split_foo(experiences) streams = [online_train_stream] for s in original_streams: s = copy(s) name_before = s.name # Set attributes of the new stream s.name = "original_" + s.name s.benchmark.stream_definitions[s.name] = \ s.benchmark.stream_definitions[name_before] setattr(s.benchmark, f"{s.name}_stream", getattr(s.benchmark, f"{name_before}_stream")) streams.append(s) super().__init__(streams)