from abc import abstractmethod, ABC
from contextlib import contextmanager
from copy import copy
from enum import Enum
from types import GeneratorType
from typing import (
from typing_extensions import final

import numpy as np
from avalanche.benchmarks.utils import AvalancheDataset
import warnings

from avalanche.benchmarks.utils.dataset_utils import (

T = TypeVar('T')
TCov = TypeVar('TCov', covariant=True)
E = TypeVar('E')

# Dataset
TCLDataset = TypeVar(
    bound='AvalancheDataset')  # Implementation, defined in utils
TCLDatasetCov = TypeVar(
    covariant=True)  # Implementation, defined in utils

# Scenario
TCLScenario = TypeVar(
    bound='CLScenario')  # Implementation, defined here
TCLScenarioCov = TypeVar(
    covariant=True)  # Implementation, defined here

# Stream
TCLStream = TypeVar(
    bound='CLStream')  # Implementation, defined here
TCLStreamCov = TypeVar(
    covariant=True)  # Implementation, defined here
TSequenceCLStream = TypeVar(

# Experience
TCLExperience = TypeVar(
    bound='CLExperience')  # Implementation, defined here
TDatasetExperience = TypeVar(
    bound='DatasetExperience')  # Implementation, defined here

class MaskedAttributeError(ValueError):
    """An error that is thrown when the user tries to access experience
    attributes which are private in the current experience's mode"""


class ExperienceMode(Enum):
    """ExperienceMode is an enum used to change visibility of experience's

    Example: task labels may be available during training but not evaluation.

    Current modes:
    - TRAIN: training time (e.g. train method in strategies).
    - INFERENCE: evaluation time (e.g. eval method in strategies).
    - LOGGING: maximum visibility. Useful when computing metrics.

    TRAIN = 1
    EVAL = 2
    LOGGING = 3

[docs]class ExperienceAttribute(Generic[TCov]): """Experience attributes are used to define data belonging to an experience which may only be available at train or eval time. For example, experiences often keep a reference to the entire stream, which should be accessible only by the loggers and evaluation system, but should never be used by the strategy in the train/eval loops. """
[docs] def __init__( self, value: TCov, use_in_train: bool = False, use_in_eval: bool = False ): """Init. :param value: attribute value. :param use_in_train: if True the attribute is available at training time. :param use_in_eval: if True the attribute is available at evaluation time. """ self.value: TCov = value self.use_in_train: bool = use_in_train self.use_in_eval: bool = use_in_eval
# experience attributes can be values of a generic type T # or they can be wrapped with an ExperienceAttribute object to handle # scope visibility TExperienceAttribute = Union[T, ExperienceAttribute[T]]
[docs]class CLExperience: """ Base Experience. Experiences have an index which track the experience's position inside the stream for evaluation purposes. """
[docs] def __init__( self: TCLExperience, current_experience: int, origin_stream: 'CLStream[TCLExperience]'): super().__init__() self._current_experience: int = current_experience """Experience identifier (the position in the origin_stream).""" self._origin_stream: 'CLStream[TCLExperience]' = origin_stream """Stream containing the experience.""" self._exp_mode: ExperienceMode = ExperienceMode.LOGGING # used to block access to private info (e.g. task labels, # past experiences). self._unmask_context_depth = 0 self._as_attributes('_current_experience')
@property def current_experience(self) -> int: curr_exp = self._current_experience CLExperience._check_unset_attribute('current_experience', curr_exp) return curr_exp @current_experience.setter def current_experience(self, id: int): self._current_experience = id @property def origin_stream(self: TCLExperience) -> 'CLStream[TCLExperience]': orig_stream = self._origin_stream CLExperience._check_unset_attribute('origin_stream', orig_stream) return orig_stream @origin_stream.setter def origin_stream(self: TCLExperience, stream: 'CLStream[TCLExperience]'): self._origin_stream = stream @contextmanager def no_attribute_masking(self): try: self._unmask_context_depth += 1 assert self._unmask_context_depth > 0 yield finally: self._unmask_context_depth -= 1 assert self._unmask_context_depth >= 0 @property def are_attributes_masked(self) -> bool: return self._unmask_context_depth == 0 def __getattribute__(self, item): """Custom getattribute. Check that ExperienceAttribute are available in train/eval mode. """ v = super().__getattribute__(item) if isinstance(v, ExperienceAttribute): if not self.are_attributes_masked: return v.value elif self._exp_mode == ExperienceMode.TRAIN and v.use_in_train: return v.value elif self._exp_mode == ExperienceMode.EVAL and v.use_in_eval: return v.value elif self._exp_mode == ExperienceMode.LOGGING: return v.value else: mode = ( "train" if self._exp_mode == ExperienceMode.TRAIN else "eval" ) se = ( f"Attribute {item} is not available for the experience " f"in {mode} mode." ) raise MaskedAttributeError(se) else: return v def __setattr__(self, name, value): try: v = self.__dict__[name] except KeyError: return super().__setattr__(name, value) if isinstance(v, ExperienceAttribute): if isinstance(value, ExperienceAttribute): super().__setattr__(name, value) else: v.value = value else: return super().__setattr__(name, value) def _as_attributes( self, *fields: str, use_in_train=False, use_in_eval=False): """ Internal method used to transform plain object fields to ExperienceAttribute(s). This is needed to ensure that static type checkers will not consider those fields as being of type "ExperienceAttribute", as this may be detrimental on the user experience. """ for field in fields: v = super().__getattribute__(field) if isinstance(v, ExperienceAttribute): if v.use_in_train != use_in_train: raise RuntimeError( f'Experience attribute {field} redefined with ' f'incongruent use_in_train field. Was ' f'{v.use_in_train}, overridden with {use_in_train}.' ) if v.use_in_eval != use_in_eval: raise RuntimeError( f'Experience attribute {field} redefined with ' f'incongruent use_in_eval field. Was ' f'{v.use_in_eval}, overridden with {use_in_train}.' ) else: setattr(self, field, ExperienceAttribute( value=v, use_in_train=use_in_train, use_in_eval=use_in_eval )) def train(self: TCLExperience) -> TCLExperience: """Return training experience. This is a copy of the experience itself where the private data (e.g. experience IDs) is removed to avoid its use during training. """ exp = copy(self) exp._exp_mode = ExperienceMode.TRAIN return exp def eval(self: TCLExperience) -> TCLExperience: """Return inference experience. This is a copy of the experience itself where the inference data (e.g. experience IDs) is available. """ exp = copy(self) exp._exp_mode = ExperienceMode.EVAL return exp def logging(self: TCLExperience) -> TCLExperience: """Return logging experience. This is a copy of the experience itself where all the attributes are available. Useful for logging and metric computations. """ exp = copy(self) exp._exp_mode = ExperienceMode.LOGGING return exp @staticmethod def _check_unset_attribute(attribute_name: str, attribute_value: Any): assert attribute_value is not None, f'Attribute {attribute_name} ' + \ 'not set. This is an unexpected and usually liked to errors ' + \ 'in the implementation of the stream\'s experience factory.'
class DatasetExperience( CLExperience, Generic[TCLDataset], ABC): """Base Experience. Experiences have an index which track the experience's position inside the stream for evaluation purposes. """ def __init__( self: TDatasetExperience, current_experience: int, origin_stream: 'CLStream[TDatasetExperience]', benchmark: 'CLScenario', dataset: TCLDataset): super().__init__( current_experience=current_experience, origin_stream=origin_stream) self._benchmark: CLScenario = benchmark self._dataset: TCLDataset = dataset @property def benchmark(self) -> 'CLScenario': bench = self._benchmark CLExperience._check_unset_attribute( 'benchmark', bench ) return bench @benchmark.setter def benchmark(self, bench: 'CLScenario'): self._benchmark = bench @property def dataset(self) -> TCLDataset: data = self._dataset CLExperience._check_unset_attribute( 'dataset', data ) return data @dataset.setter def dataset(self, d: TCLDataset): self._dataset = d @property def scenario(self) -> 'CLScenario': """This property is DEPRECATED.""" warnings.warn( "Using self.scenario is deprecated in Experience. " "Consider using self.benchmark instead.", stacklevel=2, ) return self.benchmark @property def task_label(self) -> int: """ The task label. This value will never have value "None". However, for scenarios that don't produce task labels a placeholder value like 0 is usually set. Beware that this field is meant as a shortcut to obtain a unique task label: it assumes that only patterns labeled with a single task label are present. If this experience contains patterns from multiple tasks, accessing this property will result in an exception. """ if len(self.task_labels) != 1: raise ValueError( "The task_label property can only be accessed " "when the experience contains a single task label" ) return self.task_labels[0] @property def task_labels(self) -> List[int]: task_labels = getattr( self.dataset, 'targets_task_labels', None) assert task_labels is not None, \ ('In its default implementation, DatasetExperience will use the ' 'the dataset `targets_task_labels` field to compute the ' 'content of the `task_label(s)` field. The given does not ' 'contain such field.') return list(set(task_labels)) class AbstractClassTimelineExperience( DatasetExperience[TCLDataset], ABC ): """ Definition of a learning experience. A learning experience contains a set of patterns which has become available at a particular time instant. The content and size of an Experience is defined by the specific benchmark that creates the experience. For instance, an experience of a New Classes scenario will contain all patterns belonging to a subset of classes of the original training set. An experience of a New Instance scenario will contain patterns from previously seen classes. """ def __init__( self: TDatasetExperience, origin_stream: 'CLStream[TDatasetExperience]', dataset: TCLDataset, current_experience: int, classes_in_this_exp: Optional[Sequence[int]], previous_classes: Optional[Sequence[int]], classes_seen_so_far: Optional[Sequence[int]], future_classes: Optional[Sequence[int]], ): """ Creates an instance of an experience given the benchmark stream, the current experience ID and data about the classes timeline. :param origin_stream: The stream from which this experience was obtained. :param current_experience: The current experience ID, as an integer. :param classes_in_this_exp: The list of classes in this experience. :param previous_classes: The list of classes in previous experiences. :param classes_seen_so_far: List of classes of current and previous experiences. :param future_classes: The list of classes of next experiences. """ self.classes_in_this_experience: Optional[Sequence[int]] = \ classes_in_this_exp """ The list of classes in this experience """ self.previous_classes: Optional[Sequence[int]] = \ previous_classes """ The list of classes in previous experiences """ self.classes_seen_so_far: Optional[Sequence[int]] = \ classes_seen_so_far """ List of classes of current and previous experiences """ self.future_classes: Optional[Sequence[int]] = \ future_classes """ The list of classes of next experiences """ super().__init__( current_experience=current_experience, origin_stream=origin_stream, benchmark=origin_stream.benchmark, # type: ignore dataset=dataset ) class GeneratorMemo(Generic[T]): def __init__(self, generator: Generator[T, None, None]): self._generator: Optional[Generator[T, None, None]] = generator self._already_generated: List[T] = [] def __iter__(self): idx = 0 while True: if idx < len(self._already_generated): yield self._already_generated[idx] else: if self._generator is None: break try: next_item = next(self._generator) except StopIteration: self._generator = None break self._already_generated.append(next_item) yield next_item idx += 1
[docs]class CLStream(Generic[TCLExperience]): """A CL stream is a named iterator of experiences. In general, many streams may be generator and not explicit lists to avoid keeping many objects in memory. NOTE: streams should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__( self: TCLStream, name: str, exps_iter: Iterable[TCLExperience], benchmark: 'CLScenario[TCLStream]', set_stream_info: bool = True, ): """ Creates an instance of a experience stream. :param name: The name of the stream. :param exps_iter: The iterable from which experiences will be obtained. :param benchmark: The benchmarks defining this stream. :param set_stream_info: If True, will set the `current_experience` and `origin_stream` fields on experience objects before returning them. Defaults to True. """ str = name """ The name of the stream (for instance: "train", "test", "valid", ...). """ self.exps_iter: Iterable[TCLExperience] = exps_iter """ The iterable from which experiences will be obtained. """ self.benchmark: 'CLScenario[TCLStream]' = benchmark """ A reference to the benchmark. """ self.set_stream_info: bool = set_stream_info """ If True, will set the `current_experience` and `origin_stream` fields on experience objects before returning them. """ if isinstance(self.exps_iter, GeneratorType): # Prevent issues when iterating the stream more than once self.exps_iter = GeneratorMemo(self.exps_iter)
def __iter__(self) -> Iterator[TCLExperience]: exp: TCLExperience for i, exp in enumerate(self.exps_iter): if self.set_stream_info: exp.current_experience = i exp.origin_stream = self yield exp
class SizedCLStream(CLStream[TCLExperience], ABC): """ Abstract class for defining CLStreams whose size (number of experiences) is known. """ def __init__( self: TCLStream, name: str, exps_iter: Iterable[TCLExperience], benchmark: 'CLScenario[TCLStream]', set_stream_info: bool = True, ): super().__init__( name=name, exps_iter=exps_iter, benchmark=benchmark, set_stream_info=set_stream_info) @abstractmethod def __len__(self) -> int: """ Gets the number of experiences this stream it's made of. :return: The number of experiences in this stream. """ pass class SequenceCLStream( SizedCLStream[TCLExperience], Sequence[TCLExperience], ABC): """ Defines a stream that behaves like a :class:`Sequence`. This is the most common base class for streams in Avalanche as it implements the basic indexing and slicing functionalities for streams. """ def __init__( self, name: str, benchmark: 'CLScenario', set_stream_info: bool = True, slice_ids: Optional[Iterable[int]] = None ): self.slice_ids: Optional[List[int]] = \ list(slice_ids) if slice_ids is not None else None """ Describes which experiences are contained in the current stream slice. Can be None, which means that this object is the original stream. """ super().__init__( name=name, exps_iter=self, benchmark=benchmark, set_stream_info=set_stream_info) def __iter__(self) -> Iterator[TCLExperience]: exp: TCLExperience for i in range(len(self)): exp = self[i] yield exp @overload def __getitem__(self, item: int) -> TCLExperience: ... @overload def __getitem__(self: TSequenceCLStream, item: slice) -> \ TSequenceCLStream: ... @final def __getitem__(self: TSequenceCLStream, item: Union[int, slice]) -> \ Union[TSequenceCLStream, TCLExperience]: # This check allows CL streams slicing if isinstance(item, (int, np.integer)): item = int(item) if item >= len(self): raise IndexError( "Experience index out of bounds" + str(int(item)) ) curr_exp = item if self.slice_ids is None else self.slice_ids[item] exp = self._make_experience(curr_exp) if self.set_stream_info: exp.current_experience = curr_exp exp.origin_stream = self return exp else: new_slice = self._forward_slice(self.slice_ids, item) return self._make_slice(new_slice) def __len__(self) -> int: """ Gets the number of experiences this stream it's made of. :return: The number of experiences in this stream. """ if self.slice_ids is not None: return len(self.slice_ids) else: return self._full_length() def _forward_slice(self, *slices: Union[None, slice, Iterable[int]]) -> \ Optional[Iterable[int]]: any_slice = False indices = list(range(self._full_length())) for sl in slices: if sl is None: continue any_slice = True slice_indices = slice_alike_object_to_indices( slice_alike_object=sl, max_length=len(indices) ) new_indices = [indices[x] for x in slice_indices] indices = new_indices if any_slice: return indices else: return None # No slicing @abstractmethod def _full_length(self) -> int: """ Gets the number of experiences in the originating stream (that is, the non-sliced stream). """ pass @abstractmethod def _make_experience(self, experience_idx: int) -> \ TCLExperience: """ Obtain the experience at the given position in the originating stream (that is, the non-sliced stream). """ pass def _make_slice( self: TSequenceCLStream, experience_slice: Optional[Iterable[int]]) -> TSequenceCLStream: """ Obtain a sub-stream given a list of indices of the experiences to include. Experience ids are the ones of the originating stream (that is, the non-sliced stream). """ stream_copy = copy(self) stream_copy.slice_ids = list(experience_slice) if \ experience_slice is not None else None return stream_copy
[docs]class EagerCLStream(SequenceCLStream[TCLExperience]): """ A CL stream build from a pre-initialized list of experience. NOTE: streams should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__( self, name: str, exps: Sequence[TCLExperience], benchmark: 'CLScenario', set_stream_info: bool = True, slice_ids: Optional[Iterable[int]] = None ): """Create a CL stream given a list of experiences. :param name: name of the stream. :param exps: list of experiences. :param benchmark: a reference to the benchmark. :param set_stream_info: if True, set the `origin_stream` and `current_experience` identifier for each experience. If False, the attributes are left unchanged. :param slice_ids: The indices of experiences to include. from the original stream. Defaults to None. For internal use. """ self._exps: List[TCLExperience] = list(exps) super().__init__( name=name, benchmark=benchmark, set_stream_info=set_stream_info, slice_ids=slice_ids) if self.set_stream_info: slice_ids_enum = self.slice_ids if \ self.slice_ids is not None else range(len(self._exps)) for i in slice_ids_enum: exp = self._exps[i] exp.current_experience = i exp.origin_stream = self # type: ignore self.set_stream_info = False
@property def exps(self) -> Tuple[TCLExperience, ...]: return tuple(self.exps_iter) def _full_length(self) -> int: return len(self._exps) def _make_experience(self, experience_idx: int) -> TCLExperience: return self._exps[experience_idx]
[docs]class CLScenario(Generic[TCLStream]): """ Continual Learning benchmark. A Continual Learning benchmark is a container for a set of streams of experiences. It may also contain other additional data useful for evaluation purposes or logging. The content of each experience depends on the underlying problem (data in a supervised problem, environments in reinforcement learning, and so on). NOTE: benchmarks should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__(self, streams: Iterable[TCLStream]): """Creates an instance of a Continual Learning benchmark. :param streams: a list of streams. """ self._streams: Dict[str, TCLStream] = dict() for s in streams: self._streams[] = s for s in streams: self.__dict__[ + "_stream"] = s
@property def streams(self): # we don't want in-place modifications so we return a copy return copy(self._streams)
