Source code for avalanche.benchmarks.scenarios.online_scenario

################################################################################
# Copyright (c) 2022 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 11-04-2022                                                             #
# Author(s): Antonio Carta                                                     #
# E-mail: contact@continualai.org                                              #
# Website: avalanche.continualai.org                                           #
################################################################################
from functools import partial
from typing import (
    Callable,
    Generator,
    Iterable,
    List,
    Optional,
    TypeVar,
    Union,
)
from typing_extensions import Literal

import torch
from avalanche.benchmarks.scenarios.benchmark_wrapper_utils import \
    wrap_stream

from avalanche.benchmarks.utils import AvalancheDataset
from avalanche.benchmarks.scenarios.generic_scenario import (
    CLExperience,
    CLStream,
    CLScenario,
    DatasetExperience,
    CLScenario,
)

from avalanche.benchmarks.scenarios.dataset_scenario import (
    DatasetScenario
)
from avalanche.benchmarks.utils.classification_dataset import \
    ClassificationDataset


TCLDataset = TypeVar(
    'TCLDataset',
    bound='AvalancheDataset')
TClassificationDataset = TypeVar(
    'TClassificationDataset',
    bound='ClassificationDataset')
TCLScenario = TypeVar(
    "TCLScenario",
    bound="CLScenario")
TDatasetScenario = TypeVar(
    "TDatasetScenario",
    bound="DatasetScenario")
TOnlineCLScenario = TypeVar(
    'TOnlineCLScenario',
    bound='OnlineCLScenario')
TCLStream = TypeVar(
    'TCLStream',
    bound='CLStream')
TCLExperience = TypeVar(
    'TCLExperience',
    bound='CLExperience')
TOnlineCLExperience = TypeVar(
    'TOnlineCLExperience',
    bound='OnlineCLExperience')
TOnlineClassificationExperience = TypeVar(
    'TOnlineClassificationExperience',
    bound='OnlineClassificationExperience')


[docs]class OnlineCLExperience(DatasetExperience[TCLDataset]): """Online CL (OCL) Experience. OCL experiences are created by splitting a larger experience. Therefore, they keep track of the original experience for logging purposes. """
[docs] def __init__( self: TOnlineCLExperience, current_experience: int, origin_stream: CLStream[TOnlineCLExperience], benchmark: CLScenario, dataset: TCLDataset, origin_experience: DatasetExperience, subexp_size: int = 1, is_first_subexp: bool = False, is_last_subexp: bool = False, sub_stream_length: Optional[int] = None, access_task_boundaries: bool = False, ): """Init. :param current_experience: experience identifier. :param origin_stream: origin stream. :param origin_experience: origin experience used to create self. :param is_first_subexp: whether self is the first in the sub-experiences stream. :param sub_stream_length: the sub-stream length. """ super().__init__( current_experience=current_experience, origin_stream=origin_stream, benchmark=benchmark, dataset=dataset) self.access_task_boundaries = access_task_boundaries self.origin_experience: DatasetExperience = origin_experience self.subexp_size: int = subexp_size self.is_first_subexp: bool = is_first_subexp self.is_last_subexp: bool = is_last_subexp self.sub_stream_length: Optional[int] = sub_stream_length self._as_attributes( 'origin_experience', 'subexp_size', 'is_first_subexp', 'is_last_subexp', 'sub_stream_length', use_in_train=access_task_boundaries)
@property def task_labels(self) -> List[int]: return self.origin_experience.task_labels
class OnlineClassificationExperience( OnlineCLExperience[ TClassificationDataset]): """ A specialization of :class:`OnlineCLExperience` with the `classes_in_this_experience` field. """ def __init__( self: TOnlineClassificationExperience, current_experience: int, origin_stream: CLStream[TOnlineClassificationExperience], benchmark: CLScenario, dataset: TClassificationDataset, origin_experience: DatasetExperience, classes_in_this_experience: List[int], subexp_size: int = 1, is_first_subexp: bool = False, is_last_subexp: bool = False, sub_stream_length: Optional[int] = None, access_task_boundaries: bool = False, ): """Init. :param current_experience: experience identifier. :param origin_stream: origin stream. :param origin_experience: origin experience used to create self. :param is_first_subexp: whether self is the first in the sub-experiences stream. :param sub_stream_length: the sub-stream length. """ super().__init__( current_experience=current_experience, origin_stream=origin_stream, benchmark=benchmark, dataset=dataset, origin_experience=origin_experience, subexp_size=subexp_size, is_first_subexp=is_first_subexp, is_last_subexp=is_last_subexp, sub_stream_length=sub_stream_length, access_task_boundaries=access_task_boundaries) self.classes_in_this_experience: List[int] = classes_in_this_experience def fixed_size_experience_split( experience: DatasetExperience[TClassificationDataset], experience_size: int, online_benchmark: TOnlineCLScenario, shuffle: bool = True, drop_last: bool = False, access_task_boundaries: bool = False ) -> Generator[ OnlineClassificationExperience[ TClassificationDataset], None, None]: """ Returns a lazy stream generated by splitting an experience into smaller ones. Splits the experience in smaller experiences of size `experience_size`. :param experience: The experience to split. :param experience_size: The experience size (number of instances). :param shuffle: If True, instances will be shuffled before splitting. :param drop_last: If True, the last mini-experience will be dropped if not of size `experience_size` :return: The list of datasets that will be used to create the mini-experiences. """ exp_dataset = experience.dataset exp_indices = list(range(len(exp_dataset))) exp_targets = torch.as_tensor( list(exp_dataset.targets), dtype=torch.long # type: ignore ) if shuffle: exp_indices = torch.as_tensor(exp_indices)[ torch.randperm(len(exp_indices)) ].tolist() sub_stream_length = len(exp_indices) // experience_size if not drop_last and len(exp_indices) % experience_size > 0: sub_stream_length += 1 init_idx = 0 is_first = True is_last = False exp_idx = 0 while init_idx < len(exp_indices): final_idx = init_idx + experience_size # Exclusive if final_idx > len(exp_indices): if drop_last: break final_idx = len(exp_indices) is_last = True sub_exp_subset = \ exp_dataset.subset(exp_indices[init_idx:final_idx]) sub_exp_targets: torch.Tensor = \ exp_targets[exp_indices[init_idx:final_idx]].unique() # origin_stream will be lazily set later exp = OnlineClassificationExperience( current_experience=exp_idx, origin_stream=None, # type: ignore benchmark=online_benchmark, dataset=sub_exp_subset, origin_experience=experience, classes_in_this_experience=sub_exp_targets.tolist(), subexp_size=experience_size, is_first_subexp=is_first, is_last_subexp=is_last, sub_stream_length=sub_stream_length, access_task_boundaries=access_task_boundaries, ) is_first = False yield exp init_idx = final_idx exp_idx += 1 def _default_online_split( online_benchmark, shuffle: bool, drop_last: bool, access_task_boundaries: bool, exp: DatasetExperience[TClassificationDataset], size: int): return fixed_size_experience_split( experience=exp, experience_size=size, online_benchmark=online_benchmark, shuffle=shuffle, drop_last=drop_last, access_task_boundaries=access_task_boundaries, ) def split_online_stream( original_stream: Iterable[DatasetExperience[TClassificationDataset]], experience_size: int, online_benchmark: 'OnlineCLScenario[TClassificationDataset]', shuffle: bool = True, drop_last: bool = False, experience_split_strategy: Optional[Callable[ [DatasetExperience[TClassificationDataset], int], Iterable[ OnlineClassificationExperience[ TClassificationDataset]] ]] = None, access_task_boundaries: bool = False ) -> CLStream[ DatasetExperience[ TClassificationDataset]]: """Split a stream of large batches to create an online stream of small mini-batches. The resulting stream can be used for Online Continual Learning (OCL) scenarios (or data-incremental, or other online-based settings). For efficiency reasons, the resulting stream is an iterator, generating experience on-demand. :param original_stream: The stream with the original data. :param experience_size: The size of the experience, as an int. Ignored if `custom_split_strategy` is used. :param shuffle: If True, experiences will be split by first shuffling instances in each experience. This will use the default PyTorch random number generator at its current state. Defaults to False. Ignored if `experience_split_strategy` is used. :param drop_last: If True, if the last experience doesn't contain `experience_size` instances, then the last experience will be dropped. Defaults to False. Ignored if `experience_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split`. :return: A lazy online stream with experiences of size `experience_size`. """ if experience_split_strategy is None: # functools.partial is a more compact option # However, MyPy does not understand what a partial is -_- def default_online_split_wrapper(e, e_sz): return _default_online_split( online_benchmark, shuffle, drop_last, access_task_boundaries, e, e_sz ) split_strategy = default_online_split_wrapper else: split_strategy = experience_split_strategy def exps_iter(): for exp in original_stream: for sub_exp in split_strategy(exp, experience_size): yield sub_exp stream_name: str = getattr(original_stream, "name", "train") return CLStream( name=stream_name, exps_iter=exps_iter(), set_stream_info=True, benchmark=online_benchmark ) def _fixed_size_split( online_benchmark: 'OnlineCLScenario', experience_size: int, access_task_boundaries: bool, shuffle: bool, s: Iterable[ DatasetExperience[TClassificationDataset]]) -> \ CLStream[ DatasetExperience[ TClassificationDataset]]: return split_online_stream( original_stream=s, experience_size=experience_size, online_benchmark=online_benchmark, access_task_boundaries=access_task_boundaries, shuffle=shuffle, )
[docs]class OnlineCLScenario(CLScenario[CLStream[DatasetExperience[TCLDataset]]]):
[docs] def __init__( self, original_streams: Iterable[CLStream[DatasetExperience[TCLDataset]]], experiences: Optional[Union[ DatasetExperience[TCLDataset], Iterable[DatasetExperience[TCLDataset]]]] = None, experience_size: int = 10, stream_split_strategy: Literal['fixed_size_split'] = "fixed_size_split", access_task_boundaries: bool = False, shuffle: bool = True, ): """Creates an online scenario from an existing CL scenario :param original_streams: The streams from the original CL scenario. :param experiences: If None, the online stream will be created from the `train_stream` of the original CL scenario, otherwise it will create an online stream from the given sequence of experiences. :param experience_size: The size of each online experiences, as an int. Ignored if `custom_split_strategy` is used. :param experience_split_strategy: A function that implements a custom splitting strategy. The function must accept an experience and return an experience's iterator. Defaults to None, which means that the standard splitting strategy will be used (which creates experiences of size `experience_size`). A good starting to understand the mechanism is to look at the implementation of the standard splitting function :func:`fixed_size_experience_split`. : param access_task_boundaries: If True the attributes related to task boundaries such as `is_first_subexp` and `is_last_subexp` become accessible during training. :param shuffle: If True, experiences will be split by first shuffling instances in each experience. Defaults to True. """ if stream_split_strategy != "fixed_size_split": raise ValueError("Unknown experience split strategy") split_strat = partial(_fixed_size_split, self, experience_size, access_task_boundaries, shuffle) streams_dict = {s.name: s for s in original_streams} if "train" not in streams_dict: raise ValueError("Missing train stream for `original_streams`.") if experiences is None: online_train_stream = split_strat(streams_dict["train"]) else: if not isinstance(experiences, Iterable): experiences = [experiences] online_train_stream = split_strat(experiences) streams: List[CLStream] = [online_train_stream] for s in original_streams: s_wrapped = wrap_stream( new_name="original_" + s.name, new_benchmark=self, wrapped_stream=s ) streams.append(s_wrapped) super().__init__( streams=streams )
__all__ = [ 'OnlineCLExperience', 'OnlineClassificationExperience', 'fixed_size_experience_split', 'split_online_stream', 'OnlineCLScenario' ]