Source code for avalanche.benchmarks.datasets.core50.core50

################################################################################
# Copyright (c) 2021 ContinualAI.                                              #
# Copyrights licensed under the MIT License.                                   #
# See the accompanying LICENSE file for terms.                                 #
#                                                                              #
# Date: 10-10-2020                                                             #
# Author: Vincenzo Lomonaco                                                    #
# E-mail: contact@continualai.org                                              #
# Website: www.continualai.org                                                 #
################################################################################

""" CORe50 Pytorch Dataset """

import glob
import logging
import os
import pickle as pkl
from pathlib import Path
from typing import Union
from warnings import warn

from torchvision.datasets.folder import default_loader
from torchvision.transforms import ToTensor

from avalanche.benchmarks.datasets.core50 import core50_data
from avalanche.benchmarks.datasets import default_dataset_location
from avalanche.benchmarks.datasets.downloadable_dataset import (
    DownloadableDataset,
)


[docs]class CORe50Dataset(DownloadableDataset): """CORe50 Pytorch Dataset"""
[docs] def __init__( self, root: Union[str, Path] = None, *, train=True, transform=None, target_transform=None, loader=default_loader, download=True, mini=False, object_level=True, ): """Creates an instance of the CORe50 dataset. :param root: root for the datasets data. Defaults to None, which means that the default location for 'core50' will be used. :param train: train or test split. :param transform: eventual transformations to be applied. :param target_transform: eventual transformation to be applied to the targets. :param loader: the procedure to load the instance from the storage. :param download: boolean to automatically download data. Default to True. :param mini: boolean to use the 32x32 version instead of the 128x128. Default to False. :param object_level: if the classification is objects based or category based: 50 or 10 way classification problem. Default to True (50-way object classification problem) """ if root is None: root = default_dataset_location("core50") super(CORe50Dataset, self).__init__( root, download=download, verbose=True ) self.train = train # training set or test set self.transform = transform self.target_transform = target_transform self.loader = loader self.object_level = object_level self.mini = mini # any scenario and run is good here since we want just to load the # train images and targets with no particular order self._scen = "ni" self._run = 0 self._nbatch = 8 # Download the dataset and initialize metadata self._load_dataset()
def __getitem__(self, index): """ Args: index (int): Index Returns: tuple: (sample, target) where target is class_index of the target class. """ target = self.targets[index] if self.mini: bp = "core50_32x32" else: bp = "core50_128x128" img = self.loader(str(self.root / bp / self.paths[index])) if self.transform is not None: img = self.transform(img) if self.target_transform is not None: target = self.target_transform(target) return img, target def __len__(self): return len(self.targets) def _download_dataset(self) -> None: data2download = core50_data.data if self.mini: data2download = list(data2download) data2download[0] = core50_data.extra_data[1] for name in data2download: if self.verbose: print("Downloading " + name[1] + "...") file = self._download_file(name[1], name[0], name[2]) if name[1].endswith(".zip"): if self.verbose: print(f"Extracting {name[0]}...") extract_root = self._extract_archive(file) if self.verbose: print("Extraction completed!") def _load_metadata(self) -> bool: if self.mini: bp = "core50_32x32" else: bp = "core50_128x128" if not (self.root / bp).exists(): return False if not (self.root / "batches_filelists").exists(): return False with open(self.root / "paths.pkl", "rb") as f: self.train_test_paths = pkl.load(f) if self.verbose: print("Loading labels...") with open(self.root / "labels.pkl", "rb") as f: self.all_targets = pkl.load(f) self.train_test_targets = [] for i in range(self._nbatch + 1): self.train_test_targets += self.all_targets[self._scen][ self._run ][i] if self.verbose: print("Loading LUP...") with open(self.root / "LUP.pkl", "rb") as f: self.LUP = pkl.load(f) if self.verbose: print("Loading labels names...") with open(self.root / "labels2names.pkl", "rb") as f: self.labels2names = pkl.load(f) self.idx_list = [] if self.train: for i in range(self._nbatch): self.idx_list += self.LUP[self._scen][self._run][i] else: self.idx_list = self.LUP[self._scen][self._run][-1] self.paths = [] self.targets = [] for idx in self.idx_list: self.paths.append(self.train_test_paths[idx]) div = 1 if not self.object_level: div = 5 self.targets.append(self.train_test_targets[idx] // div) with open(self.root / "labels2names.pkl", "rb") as f: self.labels2names = pkl.load(f) if not (self.root / "NIC_v2_79_cat").exists(): self._create_cat_filelists() return True def _download_error_message(self) -> str: all_urls = [name_url[1] for name_url in core50_data.data] base_msg = ( "[CORe50] Error downloading the dataset!\n" "You should download data manually using the following links:\n" ) for url in all_urls: base_msg += url base_msg += "\n" base_msg += "and place these files in " + str(self.root) return base_msg def _create_cat_filelists(self): """Generates corresponding filelists with category-wise labels. The default one are based on the object-level labels from 0 to 49.""" for k, v in core50_data.scen2dirs.items(): orig_root_path = os.path.join(self.root, v) root_path = os.path.join(self.root, v[:-1] + "_cat") if not os.path.exists(root_path): os.makedirs(root_path) for run in range(10): cur_path = os.path.join(root_path, "run" + str(run)) orig_cur_path = os.path.join(orig_root_path, "run" + str(run)) if not os.path.exists(cur_path): os.makedirs(cur_path) for file in glob.glob(os.path.join(orig_cur_path, "*.txt")): o_filename = file _, d_filename = os.path.split(o_filename) orig_f = open(o_filename, "r") dst_f = open(os.path.join(cur_path, d_filename), "w") for line in orig_f: path, label = line.split(" ") new_label = self._objlab2cat(int(label), k, run) dst_f.write(path + " " + str(new_label) + "\n") orig_f.close() dst_f.close() def _objlab2cat(self, label, scen, run): """Mapping an object label into its corresponding category label based on the scenario.""" if scen == "nc": return core50_data.name2cat[ self.labels2names["nc"][run][label][:-1] ] else: return int(label) // 5
def CORe50(*args, **kwargs): warn( "Dataset CORe50 has been renamed CORe50Dataset to prevent confusion " "with the CORe50 classic benchmark", DeprecationWarning, 2, ) return CORe50Dataset(*args, **kwargs) if __name__ == "__main__": # this litte example script can be used to visualize the first image # leaded from the dataset. from torch.utils.data.dataloader import DataLoader import matplotlib.pyplot as plt from torchvision import transforms import torch train_data = CORe50Dataset(transform=ToTensor()) test_data = CORe50Dataset(train=False, transform=ToTensor()) print("train size: ", len(train_data)) print("Test size: ", len(test_data)) print(train_data.labels2names) dataloader = DataLoader(train_data, batch_size=1) for batch_data in dataloader: x, y = batch_data plt.imshow(transforms.ToPILImage()(torch.squeeze(x))) plt.show() print(x.size()) print(len(y)) break __all__ = ["CORe50Dataset", "CORe50"]