Source code for avalanche.benchmarks.scenarios.online

################################################################################
# Copyright (c) 2022 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 11-04-2022                                                             #
# Author(s): Antonio Carta                                                     #
# E-mail: contact@continualai.org                                              #
# Website: avalanche.continualai.org                                           #
################################################################################
from functools import partial
import random
from typing import (
    Callable,
    Generator,
    Generic,
    Iterable,
    List,
    Optional,
    TypeVar,
    Union,
    Protocol,
    Literal,
)
import warnings
from avalanche.benchmarks.utils.data import AvalancheDataset
from avalanche.benchmarks.utils.utils import concat_datasets
import torch
from torch.distributions.categorical import Categorical
from torch.utils.data import Sampler
from .deprecated.benchmark_wrapper_utils import wrap_stream

from .generic_scenario import (
    CLStream,
    CLScenario,
)
from .dataset_scenario import DatasetExperience


TCLDataset = TypeVar("TCLDataset", bound="AvalancheDataset")
TCLScenario = TypeVar("TCLScenario", bound="CLScenario")
TOnlineCLExperience = TypeVar("TOnlineCLExperience", bound="OnlineCLExperience")


class CyclicSampler(Sampler):
    """Samples elements from [0,..,len(dataset)-1] in a cyclic manner."""

    def __init__(self, n_samples, shuffle=True, rng=None):
        self.n_samples = n_samples
        self.rng = rng
        self.shuffle = shuffle
        self._reset_indices()

    def _reset_indices(self):
        self.indices = torch.arange(self.n_samples).tolist()
        if self.shuffle:
            self.indices = torch.randperm(self.n_samples, generator=self.rng).tolist()

    def __iter__(self):
        while True:
            for idx in self.indices:
                yield idx
            self._reset_indices()

    def __len__(self):
        return self.n_samples


class BoundaryAware(Protocol):
    """Boundary-aware experiences have attributes with task boundary knowledge.

    Online streams may have boundary attributes to help training or
    metrics logging.

    Task boundaries denote changes of the underlying data distribution used
    to sample the data for the experiences.
    """

    @property
    def is_first_subexp(self) -> bool:
        """True if this is the first experience after a drift."""
        return False

    @property
    def is_last_subexp(self) -> bool:
        """True if this is the last experience before a drift."""
        return False

    @property
    def sub_stream_length(self) -> int:
        """Number of experiences with the same distribution of the current
        experience."""
        return 0

    @property
    def access_task_boundaries(self) -> bool:
        """True if the model has access to task boundaries.

        If the model is boundary-agnostic, task boundaries are available only
        for logging by setting the experience in logging mode
        `experience.logging()`.
        """
        return False


[docs]class OnlineCLExperience(DatasetExperience, Generic[TCLDataset]): """Online CL (OCL) Experience. OCL experiences are created by splitting a larger experience. Therefore, they keep track of the original experience for logging purposes. """
[docs] def __init__( self: TOnlineCLExperience, *, dataset: TCLDataset, origin_experience: DatasetExperience, is_first_subexp: bool = False, is_last_subexp: bool = False, sub_stream_length: Optional[int] = None, access_task_boundaries: bool = False, ): """A class representing a continual learning experience in an online setting. :param current_experience: The index of the current experience. :type current_experience: int :param dataset: The dataset containing the experience. :type dataset: TCLDataset :param origin_experience: The original experience from which this experience was derived. :type origin_experience: DatasetExperience :param is_first_subexp: Whether this is the first sub-experience. :type is_first_subexp: bool, optional :param is_last_subexp: Whether this is the last sub-experience. :type is_last_subexp: bool, optional :param sub_stream_length: The length of the sub-stream. :type sub_stream_length: int, optional :param access_task_boundaries: Whether to access task boundaries. :type access_task_boundaries: bool, optional """ super().__init__(dataset=dataset) self.access_task_boundaries = access_task_boundaries self.origin_experience: DatasetExperience = origin_experience self.subexp_size: int = len(dataset) self.is_first_subexp: bool = is_first_subexp self.is_last_subexp: bool = is_last_subexp self.sub_stream_length: Optional[int] = sub_stream_length self._as_attributes( "origin_experience", "subexp_size", "is_first_subexp", "is_last_subexp", "sub_stream_length", use_in_train=access_task_boundaries, use_in_eval=access_task_boundaries, )
# ========== Fixed-sized splits class FixedSizeExperienceSplitter: def __init__( self, experience: DatasetExperience, experience_size: int, shuffle: bool = True, drop_last: bool = False, access_task_boundaries: bool = False, seed: int = None, ) -> None: """Returns a lazy stream generated by splitting an experience into smaller ones. Splits the experience in smaller experiences of size `experience_size`. Experience decorators (e.g. class attributes) will be stripped from the experience. You will need to re-apply them to the resulting experiences if you need them. :param experience: The experience to split. :param experience_size: The experience size (number of instances). :param shuffle: If True, instances will be shuffled before splitting. :param drop_last: If True, the last mini-experience will be dropped if not of size `experience_size`. :param seed: random seed for shuffling the data if `shuffle == True`. :return: The list of datasets that will be used to create the mini-experiences. """ self.experience = experience self.experience_size = experience_size self.shuffle = shuffle self.drop_last = drop_last self.access_task_boundaries = access_task_boundaries self.seed = seed # we need to fix the seed because repeated calls to the generator # must return the same order every time. if seed is None: self.seed = random.randint(0, 2**32 - 1) def __iter__(self) -> Generator[OnlineCLExperience, None, None]: exp_dataset = self.experience.dataset exp_indices = list(range(len(exp_dataset))) g = torch.Generator() g.manual_seed(self.seed) if self.shuffle: exp_indices = torch.as_tensor(exp_indices)[ torch.randperm(len(exp_indices), generator=g) ].tolist() sub_stream_length = len(exp_indices) // self.experience_size if not self.drop_last and len(exp_indices) % self.experience_size > 0: sub_stream_length += 1 init_idx = 0 is_first = True is_last = False exp_idx = 0 while init_idx < len(exp_indices): final_idx = init_idx + self.experience_size # Exclusive if final_idx > len(exp_indices): if self.drop_last: break final_idx = len(exp_indices) is_last = True # check is_last when drop_last=True if self.drop_last and (final_idx + self.experience_size > len(exp_indices)): is_last = True sub_exp_subset = exp_dataset.subset(exp_indices[init_idx:final_idx]) exp = OnlineCLExperience( dataset=sub_exp_subset, origin_experience=self.experience, is_first_subexp=is_first, is_last_subexp=is_last, sub_stream_length=sub_stream_length, access_task_boundaries=self.access_task_boundaries, ) is_first = False yield exp init_idx = final_idx exp_idx += 1 def _default_online_split( shuffle: bool, drop_last: bool, access_task_boundaries: bool, exp: DatasetExperience, size: int, seed: int, ): return FixedSizeExperienceSplitter( experience=exp, experience_size=size, shuffle=shuffle, drop_last=drop_last, access_task_boundaries=access_task_boundaries, seed=seed, )
[docs]def split_online_stream( original_stream: Iterable[DatasetExperience], experience_size: int, shuffle: bool = True, drop_last: bool = False, experience_split_strategy: Optional[ Callable[ [DatasetExperience[TCLDataset], int], Iterable[OnlineCLExperience[TCLDataset]], ] ] = None, access_task_boundaries: bool = False, seed: int = None, ) -> CLStream[DatasetExperience[TCLDataset]]: """Split a stream of large batches to create an online stream of small mini-batches. The resulting stream can be used for Online Continual Learning (OCL) scenarios (or data-incremental, or other online-based settings). For efficiency reasons, the resulting stream is an iterator, generating experience on-demand. :param original_stream: The stream with the original data. :param experience_size: The size of the experience, as an int. Ignored if `custom_split_strategy` is used. :param shuffle: If True, experiences will be split by first shuffling instances in each experience. This will use the default PyTorch random number generator at its current state. Defaults to False. Ignored if `experience_split_strategy` is used. :param drop_last: If True, if the last experience doesn't contain `experience_size` instances, then the last experience will be dropped. Defaults to False. Ignored if `experience_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split`. :param seed: random seed used for shuffling by the default splitter. :return: A lazy online stream with experiences of size `experience_size`. """ if experience_split_strategy is None: # functools.partial is a more compact option # However, MyPy does not understand what a partial is -_- def default_online_split_wrapper(e, e_sz): return _default_online_split( shuffle, drop_last, access_task_boundaries, e, e_sz, seed=seed ) split_strategy = default_online_split_wrapper else: split_strategy = experience_split_strategy def exps_iter(): for exp in original_stream: for sub_exp in split_strategy(exp, experience_size): yield sub_exp stream_name: str = getattr(original_stream, "name", "train") + "_online" return CLStream( name=stream_name, exps_iter=exps_iter(), set_stream_info=True, )
def _fixed_size_split( online_benchmark: "OnlineCLScenario", # TODO: Deprecated # and unused. Remove. experience_size: int, access_task_boundaries: bool, shuffle: bool, s: Iterable[DatasetExperience[TCLDataset]], ) -> CLStream[DatasetExperience[TCLDataset]]: return split_online_stream( original_stream=s, experience_size=experience_size, access_task_boundaries=access_task_boundaries, shuffle=shuffle, ) # ========== Continuous linear decay splits def create_sub_exp_from_multi_exps( original_stream: Iterable[DatasetExperience[TCLDataset]], samplers: Iterable[CyclicSampler], exp_per_sample_list: Iterable[torch.Tensor], total_iters: int, is_first_sub_exp: bool = False, is_last_sub_exp: bool = False, ) -> DatasetExperience[TCLDataset]: """ Creates a sub-experience from a list of experiences. :param original_stream: The original stream. :param samplers: A list of samplers, one for each experience in the original stream. :param exp_per_sample_list: A list of experience ids, one for each sample in the sub-experience. :param total_iters: The total number of iterations. :param is_first_sub_exp: Whether this is the first sub-experience. :param is_last_sub_exp: Whether this is the last sub-experience. :return: A sub-experience. """ # Create sub-sets from each experience's dataset all_subsets = [] n_samples_from_each_exp = [0 for _ in range(len(samplers))] for exp_id in exp_per_sample_list.unique(): n_samples = sum(exp_per_sample_list == exp_id.item()).item() n_samples_from_each_exp[exp_id.item()] += n_samples rnd_indices = [next(samplers[exp_id]) for _ in range(n_samples)] subset_i = original_stream[exp_id.item()].dataset.subset(rnd_indices) all_subsets.append(subset_i) # Concatenate all sub-sets sub_exp_subset = concat_datasets(all_subsets) exp = OnlineCLExperience( dataset=sub_exp_subset, origin_experience=None, # experience, is_first_subexp=is_first_sub_exp, is_last_subexp=is_last_sub_exp, sub_stream_length=total_iters, access_task_boundaries=False, ) # For visualization purposes only exp.n_samples_from_each_exp = n_samples_from_each_exp return exp
[docs]def split_continuous_linear_decay_stream( original_stream: Iterable[DatasetExperience[TCLDataset]], experience_size: int, iters_per_virtual_epoch: int, beta: float, shuffle: bool, ) -> CLStream[DatasetExperience[TCLDataset]]: """Creates a stream of sub-experiences from a list of overlapped experiences with a linear decay in the overlapping areas. :param original_stream: The original stream. :param experience_size: The size of each sub-experience. :param iters_per_virtual_epoch: The number of iterations per virtual epoch. This parameter determines the number of (sub-)experiences that we want to create from each experience in the original stream, after "merging" all experiences with a certain level of "overlap". :param beta: The beta parameter for the linear decay function which indicates the amount of overlap. :param shuffle: Whether to shuffle the sub-experiences. Terminology is taken from the official implementation of the paper: "Task Agnostic Continual Learning Using Online Variational Bayes" by Zero et al. (https://arxiv.org/abs/2006.05990) Code repo: https://github.com/chenzeno/FOO-VB/tree/main :return: A stream of sub-experiences. """ def _get_linear_line(start, end, direction="up"): if direction == "up": return torch.FloatTensor( [(i - start) / (end - start) for i in range(start, end)] ) return torch.FloatTensor( [1 - ((i - start) / (end - start)) for i in range(start, end)] ) def _create_task_probs(iters, tasks, task_id, beta=3): if beta <= 1: peak_start = int((task_id / tasks) * iters) peak_end = int(((task_id + 1) / tasks) * iters) start = peak_start end = peak_end else: start = max(int(((beta * task_id - 1) * iters) / (beta * tasks)), 0) peak_start = int(((beta * task_id + 1) * iters) / (beta * tasks)) peak_end = int(((beta * task_id + (beta - 1)) * iters) / (beta * tasks)) end = min( int(((beta * task_id + (beta + 1)) * iters) / (beta * tasks)), iters ) probs = torch.zeros(iters, dtype=torch.float) if task_id == 0: probs[start:peak_start].add_(1) else: probs[start:peak_start] = _get_linear_line( start, peak_start, direction="up" ) probs[peak_start:peak_end].add_(1) if task_id == tasks - 1: probs[peak_end:end].add_(1) else: probs[peak_end:end] = _get_linear_line(peak_end, end, direction="down") return probs # Total number of iterations total_iters = len(original_stream) * iters_per_virtual_epoch # Probabilities over all iterations (sub-experiences) n_experiences = len(original_stream) tasks_probs_over_iterations = [ _create_task_probs(total_iters, n_experiences, exp_id, beta=beta) for exp_id in range(n_experiences) ] # Normalize probabilities normalize_probs = torch.zeros_like(tasks_probs_over_iterations[0]) for probs in tasks_probs_over_iterations: normalize_probs.add_(probs) for probs in tasks_probs_over_iterations: probs.div_(normalize_probs) tasks_probs_over_iterations = torch.cat(tasks_probs_over_iterations).view( -1, tasks_probs_over_iterations[0].shape[0] ) tasks_probs_over_iterations_lst = [] for col in range(tasks_probs_over_iterations.shape[1]): tasks_probs_over_iterations_lst.append(tasks_probs_over_iterations[:, col]) tasks_probs_over_iterations = tasks_probs_over_iterations_lst # Random cylic samplers over the datasets of all experiences in the stream samplers = [ iter(CyclicSampler(len(exp.dataset), shuffle=shuffle)) for exp in original_stream ] # The main iterator for the stream def exps_iter(): for sub_exp_id in range(total_iters): is_first_sub_exp = is_last_sub_exp = False if sub_exp_id == 0: is_first_sub_exp = True if sub_exp_id == total_iters - 1: is_last_sub_exp = True n_samples = torch.Size([experience_size]) exp_per_sample_list = Categorical( probs=tasks_probs_over_iterations[sub_exp_id] ).sample(n_samples) yield create_sub_exp_from_multi_exps( original_stream, samplers, exp_per_sample_list, total_iters, is_first_sub_exp, is_last_sub_exp, ) stream_name: str = getattr(original_stream, "name", "train") return CLStream( name=stream_name, exps_iter=exps_iter(), set_stream_info=True, )
# ========== Online CL scenario
[docs]class OnlineCLScenario(CLScenario):
[docs] def __init__( self, original_streams: Iterable[CLStream[DatasetExperience[TCLDataset]]], experiences: Optional[ Union[ DatasetExperience[TCLDataset], Iterable[DatasetExperience[TCLDataset]] ] ] = None, experience_size: int = 10, stream_split_strategy: Literal[ "fixed_size_split", "continuous_linear_decay" ] = "fixed_size_split", access_task_boundaries: bool = False, shuffle: bool = True, overlap_factor: int = 4, iters_per_virtual_epoch: int = 10, ): """Creates an online scenario from an existing CL scenario :param original_streams: The streams from the original CL scenario. :param experiences: If None, the online stream will be created from the `train_stream` of the original CL scenario, otherwise it will create an online stream from the given sequence of experiences. :param experience_size: The size of each online experiences, as an int. Ignored if `custom_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split`. : param access_task_boundaries: If True the attributes related to task boundaries such as `is_first_subexp` and `is_last_subexp` become accessible during training. :param shuffle: If True, experiences will be split by first shuffling instances in each experience. Defaults to True. :param overlap_factor: The overlap factor between consecutive experiences. Defaults to 4. :param iters_per_virtual_epoch: The number of iterations per virtual epoch for each experience. Defaults to 10. """ warnings.warn( "Deprecated. Use `split_online_stream` or similar methods to split" "single streams or experiences instead" ) if stream_split_strategy == "fixed_size_split": split_strat = partial( _fixed_size_split, self, experience_size, access_task_boundaries, shuffle, ) elif stream_split_strategy == "continuous_linear_decay": assert access_task_boundaries is False split_strat = partial( split_online_stream, experience_size=experience_size, iters_per_virtual_epoch=iters_per_virtual_epoch, beta=overlap_factor, shuffle=True, ) else: raise ValueError("Unknown experience split strategy") streams_dict = {s.name: s for s in original_streams} if "train" not in streams_dict: raise ValueError("Missing train stream for `original_streams`.") if experiences is None: online_train_stream = split_strat(streams_dict["train"]) else: if not isinstance(experiences, Iterable): experiences = [experiences] online_train_stream = split_strat(experiences) streams: List[CLStream] = [online_train_stream] for s in original_streams: s_wrapped = wrap_stream( new_name="original_" + s.name, new_benchmark=self, wrapped_stream=s ) streams.append(s_wrapped) super().__init__(streams=streams)
__all__ = [ "OnlineCLExperience", "FixedSizeExperienceSplitter", "split_online_stream", "split_continuous_linear_decay_stream", "OnlineCLScenario", ]