# Copyright (c) 2022 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 11-04-2022                                                             #
# Author(s): Antonio Carta                                                     #
# E-mail:                                              #
# Website:                                           #
from copy import copy
from enum import Enum
from typing import Generator, List, Iterable, TypeVar, Union, Generic

T = TypeVar("T")
E = TypeVar("E")

class MaskedAttributeError(ValueError):
    """An error that is thrown when the user tries to access experience
    attributes which are private in the current experience's mode"""


class ExperienceMode(Enum):
    """ExperienceMode is an enum used to change visibility of experience's

    Example: task labels may be available during training but not evaluation.

    Current modes:
    - TRAIN: training time (e.g. train method in strategies).
    - INFERENCE: evaluation time (e.g. eval method in strategies).
    - LOGGING: maximum visibility. Useful when computing metrics.

    TRAIN = 1
    EVAL = 2
    LOGGING = 3

[docs]class ExperienceAttribute(Generic[T]): """Experience attributes are used to define data belonging to an experience which may only be available at train or eval time. For example, experiences often keep a reference to the entire stream, which should be accessible only by the loggers and evaluation system, but should never be used by the strategy in the train/eval loops. """
[docs] def __init__( self, value: T, use_in_train: bool = False, use_in_eval: bool = False ): """Init. :param value: attribute value. :param use_in_train: if True the attribute is available at training time. :param use_in_eval: if True the attribute is available at evaluation time. """ self.value = value self.use_in_train = use_in_train self.use_in_eval = use_in_eval
# experience attributes can be values of a generic type T # or they can be wrapped with an ExperienceAttribute object to handle # scope visibility TExperienceAttribute = Union[T, ExperienceAttribute[T]]
[docs]class CLExperience(object): """Base Experience. Experiences have an index which track the experience's position inside the stream for evaluation purposes. """
[docs] def __init__(self, current_experience: int = None, origin_stream=None): super().__init__() self.current_experience: TExperienceAttribute = ExperienceAttribute( current_experience ) """Experience identifier (the position in the origin_stream).""" self.origin_stream: TExperienceAttribute = ExperienceAttribute( origin_stream ) """Stream containing the experience.""" self._exp_mode: ExperienceMode = ExperienceMode.TRAIN
# used to block access to private info (e.g. task labels, # past experiences). def __getattribute__(self, item): """Custom getattribute. Check that ExperienceAttribute are available in train/eval mode. """ v = super().__getattribute__(item) if isinstance(v, ExperienceAttribute): if self._exp_mode == ExperienceMode.TRAIN and v.use_in_train: return v.value elif self._exp_mode == ExperienceMode.EVAL and v.use_in_eval: return v.value elif self._exp_mode == ExperienceMode.LOGGING: return v.value else: mode = ( "train" if self._exp_mode == ExperienceMode.TRAIN else "eval" ) se = ( f"Attribute {item} is not available for the experience " f"in {mode} mode." ) raise MaskedAttributeError(se) else: return v def train(self): """Return training experience. This is a copy of the experience itself where the private data (e.g. experience IDs) is removed to avoid its use during training. """ exp = copy(self) exp._exp_mode = ExperienceMode.TRAIN return exp def eval(self): """Return inference experience. This is a copy of the experience itself where the inference data (e.g. experience IDs) is available. """ exp = copy(self) exp._exp_mode = ExperienceMode.EVAL return exp def logging(self): """Return logging experience. This is a copy of the experience itself where all the attributes are available. Useful for logging and metric computations. """ exp = copy(self) exp._exp_mode = ExperienceMode.LOGGING return exp
[docs]class CLStream(Generic[E]): """A CL stream is a named iterator of experiences. In general, many streams may be generator and not explicit lists to avoid keeping many objects in memory. NOTE: streams should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__( self, name: str, exps_iter: Iterable[CLExperience], benchmark=None, set_stream_info: bool = True, ): = name self.exps_iter = exps_iter self.benchmark = benchmark self.set_stream_info = set_stream_info self._iter = None
def __iter__(self) -> Generator[E, None, None]: def foo(self): for i, exp in enumerate(self.exps_iter): if self.set_stream_info: exp.current_experience = i exp.origin_stream = self yield exp return foo(self)
[docs]class EagerCLStream(CLStream[E]): """A CL stream which is a named list of experiences. Eager streams are indexable and sliceable, like python lists. NOTE: streams should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__( self, name: str, exps: List[CLExperience], benchmark=None, set_stream_info: bool = True, ): """Create a CL stream given a list of experiences. `origin_stream` and `current_experience` are set for each experience in `exps`. :param name: name of the stream. :param exps: list of experiences. :param benchmark: a reference to the benchmark. :param set_stream_info: if True, set the `origin_stream` and `current_experience` identifier for each experience. If False, the attributes are left unchanged. """ super().__init__(name, exps, benchmark, set_stream_info) self.exps = exps for i, e in enumerate(self.exps): if self.set_stream_info: e.origin_stream = self e.current_experience = i
def __getitem__(self, item) -> Union["EagerCLStream[E]", E]: # This check allows CL streams slicing if isinstance(item, slice): return EagerCLStream(, exps=self.exps[item], set_stream_info=False ) else: return self.exps[item] def __len__(self): return len(self.exps)
[docs]class CLScenario: """Continual Learning benchmark. A Continual Learning benchmark is a container for a set of streams of experiences. It may also contain other additional data useful for evaluation purposes or logging. The content of each experience depends on the underlying problem (data in a supervised problem, environments in reinforcement learning, and so on). NOTE: benchmarks should not be used by training strategies since they provide access to past, current, and future data. """
[docs] def __init__(self, streams: List[CLStream]): """Creates an instance of a Continual Learning benchmark. :param streams: a list of streams. """ self._streams = {} for s in streams: self._streams[ + "_stream"] = s for s in streams: self.__dict__[ + "_stream"] = s
@property def streams(self): # we don't want in-place modifications so we return a copy return copy(self._streams)
__all__ = [ "ExperienceAttribute", "CLExperience", "CLStream", "EagerCLStream", "CLScenario", ]