Skip to content

API Reference

Dataset Management

omnirec.recsys_data_set.RecSysDataSet(data: Optional[T] = None, meta: _DatasetMeta = _DatasetMeta())

Bases: Generic[T]

Source code in src\omnirec\recsys_data_set.py
def __init__(
    self, data: Optional[T] = None, meta: _DatasetMeta = _DatasetMeta()
) -> None:
    if data:
        self._data = data
    self._meta = meta

use_dataloader(data_set: DataSet | str, raw_dir: Optional[PathLike | str] = None, canon_path: Optional[PathLike | str] = None, force_download=False, force_canonicalize=False) -> RecSysDataSet[RawData] staticmethod

Loads a dataset using a registered DataLoader. If not already done the data set is downloaded and canonicalized. Canonicalization means duplicates are dropped, identifiers are normalized and the data is saved in a standardized format.

Parameters:

Name Type Description Default
data_set DataSet | str

The name of the dataset from the DataSet enum. Must be a registered DataLoader name.

required
raw_dir Optional[PathLike | str]

Target directory where the raw data is stored. If not provided, the data is downloaded to the default raw data directory (_DATA_DIR).

None
canon_path Optional[PathLike | str]

Path where the canonicalized data should be saved. If not provided, the data is saved to the default canonicalized data directory (_DATA_DIR / "canon").

None
force_download bool

If True, forces re-downloading of the raw data even if it already exists. Defaults to False.

False
force_canonicalize bool

If True, forces re-canonicalization of the data even if a canonicalized file exists. Defaults to False.

False

Returns:

Type Description
RecSysDataSet[RawData]

RecSysDataSet[RawData]: The loaded dataset in canonicalized RawData format.

Example
# Load the MovieLens 100K dataset using the registered DataLoader
# Download the raw data to the default directory and save the canonicalized data to the default path
dataset = RecSysDataSet.use_dataloader(data_set_name=DataSet.MovieLens100K)
Source code in src\omnirec\recsys_data_set.py
@staticmethod
def use_dataloader(
    data_set: DataSet | str,
    raw_dir: Optional[PathLike | str] = None,  # TODO: Name that right
    canon_path: Optional[PathLike | str] = None,  # TODO: Name that right
    force_download=False,
    force_canonicalize=False,
) -> "RecSysDataSet[RawData]":
    """Loads a dataset using a registered DataLoader. If not already done the data set is downloaded and canonicalized.
    Canonicalization means duplicates are dropped, identifiers are normalized and the data is saved in a standardized format.

    Args:
        data_set (DataSet | str): The name of the dataset from the DataSet enum. Must be a registered DataLoader name.
        raw_dir (Optional[PathLike | str], optional): Target directory where the raw data is stored. If not provided, the data is downloaded to the default raw data directory (_DATA_DIR).
        canon_path (Optional[PathLike | str], optional): Path where the canonicalized data should be saved. If not provided, the data is saved to the default canonicalized data directory (_DATA_DIR / "canon").
        force_download (bool, optional): If True, forces re-downloading of the raw data even if it already exists. Defaults to False.
        force_canonicalize (bool, optional): If True, forces re-canonicalization of the data even if a canonicalized file exists. Defaults to False.

    Returns:
        RecSysDataSet[RawData]: The loaded dataset in canonicalized RawData format.

    Example:
        ```Python
        # Load the MovieLens 100K dataset using the registered DataLoader
        # Download the raw data to the default directory and save the canonicalized data to the default path
        dataset = RecSysDataSet.use_dataloader(data_set_name=DataSet.MovieLens100K)
        ```
    """
    if isinstance(data_set, DataSet):
        data_set_name = data_set.value
    else:
        data_set_name = data_set
    dataset = RecSysDataSet[RawData]()

    dataset._meta.name = data_set_name

    if canon_path:
        dataset._meta.canon_pth = Path(canon_path)
    else:
        canon_dir = get_data_dir() / "canon"
        canon_dir.mkdir(parents=True, exist_ok=True)
        dataset._meta.canon_pth = (canon_dir / data_set_name).with_suffix(".csv")
    if dataset._meta.canon_pth.exists() and not (
        force_canonicalize or force_download
    ):
        logger.info(
            "Canonicalized data set already exists, skipping download and canonicalization."
        )
        dataset._data = RawData(pd.read_csv(dataset._meta.canon_pth))
        return dataset

    if raw_dir:
        dataset._meta.raw_dir = Path(raw_dir)

    dataset._data = RawData(
        registry._run_loader(data_set_name, force_download, dataset._meta.raw_dir)
    )
    dataset._canonicalize()
    return dataset

save(file: str | PathLike)

Saves the RecSysDataSet object to a file with the default suffix .rsds.

Parameters:

Name Type Description Default
file str | PathLike

The path where the file is saved.

required
Source code in src\omnirec\recsys_data_set.py
def save(self, file: str | PathLike):
    """Saves the RecSysDataSet object to a file with the default suffix .rsds.

    Args:
        file (str | PathLike): The path where the file is saved.
    """
    file = Path(file)
    if not file.suffix:
        file = file.with_suffix(".rsds")
    with zipfile.ZipFile(file, "w", zipfile.ZIP_STORED) as zf:
        if isinstance(self._data, RawData):
            with zf.open("data.csv", "w") as data_file:
                self._data.df.to_csv(data_file, index=False)
            zf.writestr("VARIANT", "RawData")
        elif isinstance(self._data, SplitData):
            for filename, data in zip(
                ["train", "val", "test"],
                [self._data.train, self._data.val, self._data.test],
            ):
                with zf.open(filename + ".csv", "w") as data_file:
                    data.to_csv(data_file, index=False)
            zf.writestr("VARIANT", "SplitData")
        elif isinstance(self._data, FoldedData):
            # TODO: Leveraging the new SplitData.get method this can be simplified:
            def write_fold(fold: int, split: str, data: pd.DataFrame):
                with zf.open(f"{fold}/{split}.csv", "w") as data_file:
                    data.to_csv(data_file, index=False)

            for fold, splits in self._data.folds.items():
                write_fold(fold, "train", splits.train)
                write_fold(fold, "val", splits.val)
                write_fold(fold, "test", splits.test)

            zf.writestr("VARIANT", "FoldedData")

        else:
            logger.critical(
                f"Unknown data variant: {type(self._data).__name__}! Aborting save operation..."
            )
            sys.exit(1)

        zf.writestr("META", json.dumps(asdict(self._meta), default=str))
        # HACK: Very simple versioning implementation in case we change anything in the future
        zf.writestr("VERSION", "1.0.0")

load(file: str | PathLike) -> RecSysDataSet[T] staticmethod

Loads a RecSysDataSet object from a file with the .rsds suffix.

Parameters:

Name Type Description Default
file str | PathLike

The path to the .rsds file.

required

Returns:

Type Description
RecSysDataSet[T]

RecSysDataSet[T]: The loaded RecSysDataSet object.

Source code in src\omnirec\recsys_data_set.py
@staticmethod
def load(file: str | PathLike) -> "RecSysDataSet[T]":
    """Loads a RecSysDataSet object from a file with the .rsds suffix.

    Args:
        file (str | PathLike): The path to the .rsds file.

    Returns:
        RecSysDataSet[T]: The loaded RecSysDataSet object.
    """
    with zipfile.ZipFile(file, "r", zipfile.ZIP_STORED) as zf:
        version = zf.read("VERSION").decode()
        # HACK: Very simple versioning implementation in case we change anything in the future
        if version != "1.0.0":
            logger.critical(f"Unknown rsds-file version: {version}")
            sys.exit(1)

        variant = zf.read("VARIANT").decode()

        if variant == "RawData":
            with zf.open("data.csv", "r") as data_file:
                data = RawData(pd.read_csv(data_file))
        elif variant == "SplitData":
            dfs: list[pd.DataFrame] = []

            for filename in ["train", "val", "test"]:
                with zf.open(filename + ".csv", "r") as data_file:
                    dfs.append(pd.read_csv(data_file))

            data = SplitData(dfs[0], dfs[1], dfs[2])
        elif variant == "FoldedData":
            folds: dict[int, SplitData] = {}

            for p in zf.namelist():
                match = RecSysDataSet._folds_file_pattern.match(p)
                if not match:
                    continue

                fold = match.group(1)
                folds.setdefault(
                    int(fold), SplitData(*[pd.DataFrame() for _ in range(3)])
                )

            # TODO: Leveraging the new FoldedData.from_split_dict method this can be simplified:
            def read_fold(fold: int, split: str) -> pd.DataFrame:
                with zf.open(f"{fold}/{split}.csv", "r") as data_file:
                    return pd.read_csv(data_file)

            for fold, split_data in folds.items():
                split_data.train = read_fold(fold, "train")
                split_data.val = read_fold(fold, "val")
                split_data.test = read_fold(fold, "test")

            data = FoldedData(folds)
        else:
            logger.critical(
                f"Unknown data variant: {variant}! Aborting load operation..."
            )
            sys.exit(1)

        meta = zf.read("META").decode()
        meta = _DatasetMeta(**json.loads(meta))
        return cast(RecSysDataSet[T], RecSysDataSet(data, meta))

Data Loaders

omnirec.data_loaders.base.Loader

Bases: ABC

info(name: str) -> DatasetInfo abstractmethod staticmethod

Provide metadata information about the dataset identified by name.

Parameters:

Name Type Description Default
name str

The name under which the loader was registered. Different names may return different DatasetInfo implementations depending on the dataset. This is useful when multiple datasets share the same loading logic but have, for example, different download URLs or checksums.

required

Returns:

Name Type Description
DatasetInfo DatasetInfo

Metadata including download URLs and optional checksum for verification.

Source code in src\omnirec\data_loaders\base.py
@staticmethod
@abstractmethod
def info(name: str) -> DatasetInfo:
    """Provide metadata information about the dataset identified by `name`.

    Args:
        name (str): The name under which the loader was registered. Different names may return different DatasetInfo
            implementations depending on the dataset. This is useful when multiple
            datasets share the same loading logic but have, for example, different
            download URLs or checksums.

    Returns:
        DatasetInfo: Metadata including download URLs and optional checksum for verification.
    """

load(source_dir: Path, name: str) -> pd.DataFrame abstractmethod staticmethod

Loads dataset from the given directory into a pd.DataFrame. The DataFrame should have the standard columns: - user - item - rating - timestamp

Parameters:

Name Type Description Default
source_dir Path

The directory that contains the downloaded dataset files.

required
name str

The name under which the loader was registered. This allows selecting between different datasets that share the same loading logic but differ in structure or file naming.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Loaded dataset as a pd.DataFrame with expected columns.

Source code in src\omnirec\data_loaders\base.py
@staticmethod
@abstractmethod
def load(source_dir: Path, name: str) -> pd.DataFrame:
    """
    Loads dataset from the given directory into a `pd.DataFrame`.
    The DataFrame should have the standard columns:
    - user
    - item
    - rating
    - timestamp

    Args:
        source_dir (Path): The directory that contains the downloaded dataset files.
        name (str): The name under which the loader was registered. This allows selecting between different
            datasets that share the same loading logic but differ in structure or
            file naming.

    Returns:
        pd.DataFrame: Loaded dataset as a pd.DataFrame with expected columns.
    """

omnirec.data_loaders.base.DatasetInfo(download_urls: Optional[str | list[str]] = None, checksum: Optional[str] = None, download_file_name: Optional[str] = None, verify_tls: bool = True, license_or_registration: bool = False) dataclass

Metadata about a dataset.

Attributes


Optional[Union[str, List[str]]]

URL or list of URLs to download the dataset. If a list is provided, URLs are tried in order until one succeeds (skipping on checksum mismatch or HTTP errors).

Optional[str]

Optional SHA256 checksum to verify the downloaded file's integrity. If provided, the downloaded file will be hashed using SHA256 and compared to this value. Use e.g. hashlib.sha256() to compute the checksum in python:

download_file_name : Optional[str] Optional custom file name to use when saving the downloaded dataset. If not provided, the name will be inferred from the URL. verify_tls : bool Whether to verify TLS/SSL certificates when downloading. Defaults is True. license_or_registration : bool Indicates if the dataset requires a license agreement or registration to access. Default is False.

import hashlib
hasher = hashlib.sha256()
with open("ml-100k.zip", "rb") as f:
    for chunk in iter(lambda: f.read(8192), b""):
        hasher.update(chunk)
print(hasher.hexdigest())

omnirec.data_loaders.registry.register_dataloader(names: str | list[str], cls: type[Loader])

Register a data loader class under one or multiple names.

Parameters:

Name Type Description Default
names str | list[str]

Name(s) to register the loader under.

required
cls type[Loader]

Loader class to register. Must inherit from the common Loader base class.

required
Source code in src\omnirec\data_loaders\registry.py
def register_dataloader(names: str | list[str], cls: type[Loader]):
    """Register a data loader class under one or multiple names.

    Args:
        names (str | list[str]): Name(s) to register the loader under.
        cls (type[Loader]): Loader class to register. Must inherit from the common `Loader` base class.
    """
    if type(names) is list:
        for n in names:
            _add_loader(n, cls)
    elif type(names) is str:
        _add_loader(names, cls)

omnirec.data_loaders.registry.list_datasets() -> list[str]

List all registered dataset names.

Returns:

Type Description
list[str]

list[str]: A list of all registered dataset names.

Source code in src\omnirec\data_loaders\registry.py
def list_datasets() -> list[str]:
    """List all registered dataset names.

    Returns:
        list[str]: A list of all registered dataset names.
    """
    return list(_DATA_LOADERS.keys())

Preprocessing Pipeline

omnirec.preprocess.base.Preprocessor()

Bases: ABC, Generic[T, U]

Source code in src\omnirec\preprocess\base.py
def __init__(self) -> None:
    super().__init__()

process(dataset: RecSysDataSet[T]) -> RecSysDataSet[U] abstractmethod

Processes the dataset and returns a new dataset variant.

Parameters:

Name Type Description Default
dataset RecSysDataSet[T]

The dataset to process.

required

Returns:

Type Description
RecSysDataSet[U]

RecSysDataSet[U]: The processed dataset.

Source code in src\omnirec\preprocess\base.py
@abstractmethod
def process(self, dataset: RecSysDataSet[T]) -> RecSysDataSet[U]:
    """Processes the dataset and returns a new dataset variant.

        Args:
            dataset (RecSysDataSet[T]): The dataset to process.

        Returns:
            RecSysDataSet[U]: The processed dataset.
    """
    pass

omnirec.preprocess.subsample.Subsample(sample_size: int | float)

Bases: Preprocessor[RawData, RawData]

Subsamples the dataset to a specified size.

Parameters:

Name Type Description Default
sample_size int | float

The size of the sample to draw from the dataset. int: The absolute number of interactions to include in the sample. float: The fraction of the dataset to include in the sample (between 0 and 1).

required
Source code in src\omnirec\preprocess\subsample.py
def __init__(self, sample_size: int | float) -> None:
    """Subsamples the dataset to a specified size.

    Args:
        sample_size (int | float): The size of the sample to draw from the dataset.
                                    int: The absolute number of interactions to include in the sample.
                                    float: The fraction of the dataset to include in the sample (between 0 and 1).
    """
    super().__init__()
    self.sample_size = sample_size

omnirec.preprocess.core_pruning.CorePruning(core: int)

Bases: Preprocessor[RawData, RawData]

Prune the dataset to the specified core. Core pruning with a threshold of e.g. 5 means that only users and items with at least 5 interactions are included in the pruned dataset.

Parameters:

Name Type Description Default
core int

The core threshold for pruning.

required
Source code in src\omnirec\preprocess\core_pruning.py
def __init__(self, core: int) -> None:
    """Prune the dataset to the specified core.
    Core pruning with a threshold of e.g. 5 means that only users and items with at least 5 interactions are included in the pruned dataset.

    Args:
        core (int): The core threshold for pruning.
    """
    super().__init__()
    self.core = core

omnirec.preprocess.feedback_conversion.MakeImplicit(threshold: int | float)

Bases: Preprocessor[RawData, RawData]

Converts explicit feedback to implicit feedback using the specified threshold.

Parameters:

Name Type Description Default
threshold int | float

The threshold for converting feedback. int: Used directly as the threshold, e.g. 3 -> only interactions with a rating of 3 or higher are included. float: Interpreted as a fraction of the maximum rating, e.g. 0.5 -> only interactions with a rating of at least 50% of the maximum rating are included.

required
Source code in src\omnirec\preprocess\feedback_conversion.py
def __init__(self, threshold: int | float) -> None:
    """Converts explicit feedback to implicit feedback using the specified threshold.

    Args:
        threshold (int | float): The threshold for converting feedback.
                                    int: Used directly as the threshold, e.g. 3 -> only interactions with a rating of 3 or higher are included.
                                    float: Interpreted as a fraction of the maximum rating, e.g. 0.5 -> only interactions with a rating of at least 50% of the maximum rating are included.
    """
    super().__init__()
    self.threshold = threshold

Filtering

omnirec.preprocess.filter.TimeFilter(start: Optional[pd.Timestamp] = None, end: Optional[pd.Timestamp] = None)

Bases: Preprocessor[RawData, RawData]

Filters the interactions by a time range. Only interactions within the specified start and end timestamps are retained.

Parameters:

Name Type Description Default
start Optional[Timestamp]

The start timestamp for the filter. Defaults to None.

None
end Optional[Timestamp]

The end timestamp for the filter. Defaults to None.

None
Source code in src\omnirec\preprocess\filter.py
def __init__(
    self, start: Optional[pd.Timestamp] = None, end: Optional[pd.Timestamp] = None
) -> None:
    """Filters the interactions by a time range. Only interactions within the specified start and end timestamps are retained.

    Args:
        start (Optional[pd.Timestamp], optional): The start timestamp for the filter. Defaults to None.
        end (Optional[pd.Timestamp], optional): The end timestamp for the filter. Defaults to None.
    """
    super().__init__()
    self._start = start
    self._end = end

omnirec.preprocess.filter.RatingFilter(lower: Optional[int | float] = None, upper: Optional[int | float] = None)

Bases: Preprocessor[RawData, RawData]

Filters the interactions by rating values. Only interactions with ratings within the specified lower and upper bounds are retained.

Parameters:

Name Type Description Default
lower Optional[int | float]

The lower bound for the filter. Defaults to None.

None
upper Optional[int | float]

The upper bound for the filter. Defaults to None.

None
Source code in src\omnirec\preprocess\filter.py
def __init__(
    self, lower: Optional[int | float] = None, upper: Optional[int | float] = None
) -> None:
    """Filters the interactions by rating values. Only interactions with ratings within the specified lower and upper bounds are retained.

    Args:
        lower (Optional[int  |  float], optional): The lower bound for the filter. Defaults to None.
        upper (Optional[int  |  float], optional): The upper bound for the filter. Defaults to None.
    """
    super().__init__()
    self._lower = lower
    self._upper = upper

omnirec.preprocess.split.RandomCrossValidation(num_folds: int, validation_size: float | int)

Bases: DataSplit[RawData, FoldedData]

Applies random cross-validation to the dataset. Randomly splits the dataset into training, validation, and test sets for each fold.

Parameters:

Name Type Description Default
num_folds int

The number of folds to use for cross-validation.

required
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, num_folds: int, validation_size: float | int) -> None:
    """Applies random cross-validation to the dataset. Randomly splits the dataset into training, validation, and test sets for each fold.

    Args:
        num_folds (int): The number of folds to use for cross-validation.
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
    """
    super().__init__(validation_size)
    self._num_folds = num_folds

omnirec.preprocess.split.RandomHoldout(validation_size: float | int, test_size: float | int)

Bases: DataSplit[RawData, SplitData]

Applies a random holdout split to the dataset. Randomly splits the dataset into training, validation, and test sets.

Parameters:

Name Type Description Default
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
test_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the test split. int: The absolute number of interactions to include in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, validation_size: float | int, test_size: float | int) -> None:
    """Applies a random holdout split to the dataset. Randomly splits the dataset into training, validation, and test sets.

    Args:
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
        test_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the test split.
                                    int: The absolute number of interactions to include in the test split.
    """
    super().__init__(validation_size)
    self._test_size = test_size

omnirec.preprocess.split.UserCrossValidation(num_folds: int, validation_size: float | int)

Bases: DataSplit[RawData, FoldedData]

Applies user-based cross-validation to the dataset. Ensures that each user has interactions in the training, validation, and test sets in each fold.

Parameters:

Name Type Description Default
num_folds int

The number of folds to use for cross-validation.

required
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, num_folds: int, validation_size: float | int) -> None:
    """Applies user-based cross-validation to the dataset. Ensures that each user has interactions in the training, validation, and test sets in each fold.

    Args:
        num_folds (int): The number of folds to use for cross-validation.
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
    """
    super().__init__(validation_size)
    self._num_folds = num_folds

omnirec.preprocess.split.UserHoldout(validation_size: float | int, test_size: float | int)

Bases: DataSplit[RawData, SplitData]

Applies the user holdout split to the dataset. Ensures that each user has interactions in the training, validation, and test sets.

Parameters:

Name Type Description Default
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
test_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the test split. int: The absolute number of interactions to include in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, validation_size: float | int, test_size: float | int) -> None:
    """Applies the user holdout split to the dataset. Ensures that each user has interactions in the training, validation, and test sets.

    Args:
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
        test_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the test split.
                                    int: The absolute number of interactions to include in the test split.
    """
    super().__init__(validation_size)
    self._test_size = test_size

omnirec.preprocess.split.TimeBasedHoldout(validation: float | int | pd.Timestamp, test: float | int | pd.Timestamp)

Bases: DataSplit[RawData, SplitData]

Applies a time-based hold-out split on a dataset. Splits the dataset into a train, test and validation split based on the timestamp. Can either use proportions, absolute numbers or timestamps as cut-off criteria.

Parameters:

Name Type Description Default
validation float | int | Timestamp

float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the validation split. int: The absolute number of newest interactions to include in the validation split. pd.Timestamp: The timestamp to use as a cut-off for the validation split. Interactions after this timestamp (newer) are included in the validation split.

required
test float | int | Timestamp

float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the test split. int: The absolute number of newest interactions to include in the test split. pd.Timestamp: The timestamp to use as a cut-off for the test split. Interactions after this timestamp (newer) are included in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(
    self,
    validation: float | int | pd.Timestamp,
    test: float | int | pd.Timestamp,
) -> None:
    """Applies a time-based hold-out split on a dataset. Splits the dataset into a train, test and validation split based on the timestamp. Can either use proportions, absolute numbers or timestamps as cut-off criteria.

    Args:
        validation (float | int | pd.Timestamp): float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the validation split.
                                                int: The absolute number of newest interactions to include in the validation split.
                                                pd.Timestamp: The timestamp to use as a cut-off for the validation split. Interactions after this timestamp (newer) are included in the validation split.
        test (float | int | pd.Timestamp): float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the test split.
                                            int: The absolute number of newest interactions to include in the test split.
                                            pd.Timestamp: The timestamp to use as a cut-off for the test split. Interactions after this timestamp (newer) are included in the test split.
    """
    super().__init__(0)

    if type(validation) is not type(test):
        self.logger.critical("Validation and test size must be the same type")
        sys.exit(1)

    self._valid_size = validation
    self._test_size = test

omnirec.preprocess.pipe.Pipe(*steps: Unpack[tuple[Unpack[Ts], Preprocessor[Any, T]]])

Bases: Generic[T]

Pipeline for automatically applying sequential preprocessing steps. Takes as input a sequence of Preprocessor objects. If process() is called, each step's process method is called in the order they were provided. Example:

    # Define preprocessing steps
    pipe = Pipe(
        Subsample(0.1),
        MakeImplicit(3),
        CorePruning(5),
        UserCrossValidation(5, 0.1),
    )

    # Apply the steps
    dataset = pipe.process(dataset)

Source code in src\omnirec\preprocess\pipe.py
def __init__(self, *steps: Unpack[tuple[Unpack[Ts], Preprocessor[Any, T]]]) -> None:
    """Pipeline for automatically applying sequential preprocessing steps. Takes as input a sequence of Preprocessor objects.
    If process() is called, each step's process method is called in the order they were provided.
    Example:
        ```Python
            # Define preprocessing steps
            pipe = Pipe(
                Subsample(0.1),
                MakeImplicit(3),
                CorePruning(5),
                UserCrossValidation(5, 0.1),
            )

            # Apply the steps
            dataset = pipe.process(dataset)
        ```
    """
    super().__init__()
    self._steps = steps

Evaluation Metrics

omnirec.runner.evaluation.Evaluator(*metrics: Metric)

Initialize the Evaluator with metrics to compute on predictions. The Evaluator computes specified metrics on algorithm predictions and accumulates results across experiments. Use get_tables() to retrieve formatted result tables.

Parameters:

Name Type Description Default
*metrics Metric

One or more metric instances to compute. Common metrics include NDCG, HR (Hit Rate), and Recall. Each metric can be configured with multiple k values (e.g., NDCG([5, 10, 20])).

()
Source code in src\omnirec\runner\evaluation.py
def __init__(self, *metrics: Metric) -> None:
    """Initialize the Evaluator with metrics to compute on predictions.
    The Evaluator computes specified metrics on algorithm predictions and accumulates
    results across experiments. Use `get_tables()` to retrieve formatted result tables.

    Args:
        *metrics (Metric): One or more metric instances to compute. Common metrics include
            NDCG, HR (Hit Rate), and Recall. Each metric can be configured with multiple
            k values (e.g., `NDCG([5, 10, 20])`).
    """
    if not isinstance(metrics, Iterable):
        metrics = [metrics]
    self._metrics = metrics
    self._results: dict[str, DataFrame] = {}

Metric Base Classes

omnirec.metrics.base.Metric

Bases: ABC

omnirec.metrics.base.Metric.calculate(predictions: pd.DataFrame, test: pd.DataFrame) -> MetricResult abstractmethod

Source code in src\omnirec\metrics\base.py
@abstractmethod
def calculate(
    self, predictions: pd.DataFrame, test: pd.DataFrame
) -> MetricResult: ...

omnirec.metrics.base.MetricResult(name: str, result: float | dict[int, float]) dataclass

Represents the result of a metric calculation. It holds the name as str and either a single float result or a dictionary of results for multiple k values.

Ranking Metrics

omnirec.metrics.ranking.HR(k: int | list[int])

Bases: RankingMetric

Computes the HR metric. k is the number of top recommendations to consider. It can be a single integer or a list of integers, in which case the metric will be computed for each value of k.

It follows the formula:

\(HR@k = \frac{1}{|U|} \sum_{u \in U} \mathbf{1}\{\text{Rel}(u) \cap \text{Pred}_k(u) \neq \emptyset\}\)

where \(\text{Pred}_k(u)\) is the set of top-k predicted items for user u.

Parameters:

Name Type Description Default
k int | list[int]

The number of top recommendations to consider.

required
Source code in src\omnirec\metrics\ranking.py
def __init__(self, k: int | list[int]) -> None:
    """
    Computes the HR metric. k is the number of top recommendations to consider.
    It can be a single integer or a list of integers, in which case the metric will be computed for each value of k.

    It follows the formula:

    $HR@k = \\frac{1}{|U|} \\sum_{u \\in U} \\mathbf{1}\\{\\text{Rel}(u) \\cap \\text{Pred}_k(u) \\neq \\emptyset\\}$

    where $\\text{Pred}_k(u)$ is the set of top-k predicted items for user u.

    Args:
        k (int | list[int]): The number of top recommendations to consider.
    """
    super().__init__(k)

calculate(predictions: DataFrame, test: DataFrame) -> MetricResult

Calculates the Hit Rate (HR) metric. Considers the top-k predictions for one or multiple k values.

Parameters:

Name Type Description Default
predictions DataFrame

Contains the top k predictions for one or more users.

required
test DataFrame

Contains the ground truth relevant items for one or more users.

required

Returns:

Name Type Description
MetricResult MetricResult

The computed HR scores for each value k. If multiple users are provided, the scores are averaged.

Source code in src\omnirec\metrics\ranking.py
def calculate(self, predictions: DataFrame, test: DataFrame) -> MetricResult:
    """Calculates the Hit Rate (HR) metric. Considers the top-k predictions for one or multiple k values.

    Args:
        predictions (DataFrame): Contains the top k predictions for one or more users.
        test (DataFrame): Contains the ground truth relevant items for one or more users.

    Returns:
        MetricResult: The computed HR scores for each value k. If multiple users are provided, the scores are averaged.
    """
    top_k_dict = self.make_topk_dict(predictions)

    hr_per_user_per_k: dict[int, list] = {}
    # FIXME: Fix metric implementation, adapt to new data format
    for user, (pred, _) in top_k_dict.items():
        positive_test_interactions = test["item"][test["user"] == user].to_numpy()
        hits = np.isin(pred[: max(self._k_list)], positive_test_interactions)
        for k in self._k_list:
            user_hr = hits[:k].sum()
            user_hr = 1 if user_hr > 0 else 0
            hr_per_user_per_k.setdefault(k, []).append(user_hr)
    scores: list[float] = [sum(v) / len(v) for v in hr_per_user_per_k.values()]
    scores_dict = {k: score for k, score in zip(self._k_list, scores)}
    return MetricResult(__class__.__name__, scores_dict)

omnirec.metrics.ranking.NDCG(k: int | list[int])

Bases: RankingMetric

Initializes the NDCG (Normalized Discounted Cumulative Gain) metric. k is the number of top predictions to consider. It can be a single integer or a list of integers, in which case the metric will be computed for each value of k.

The NDCG considers the position of relevant items in a ranked list of predictions.

For a user u, the discounted cumulative gain at cutoff k is

\(DCG@k(u) = \sum_{i=1}^{k} \frac{\mathbf{1}\{\text{pred}_i \in \text{Rel}(u)\}}{\log_2(i+1)}\)

where \(\mathbf{1}\{\cdot\}\) is the indicator function and

\(\text{Rel}(u)\) is the set of relevant items for user u.

The ideal discounted cumulative gain is

\(IDCG@k = \sum_{i=1}^{k} \frac{1}{\log_2(i+1)}\)

The normalized score is

\(NDCG@k(u) = \frac{DCG@k(u)}{IDCG@k}\)

Finally, the reported score is averaged over all users:

\(\text{NDCG@k} = \frac{1}{|U|} \sum_{u \in U} NDCG@k(u)\)

Parameters:

Name Type Description Default
k int | list[int]

The number of top predictions to consider.

required
Source code in src\omnirec\metrics\ranking.py
def __init__(self, k: int | list[int]) -> None:
    """Initializes the NDCG (Normalized Discounted Cumulative Gain) metric. k is the number of top predictions to consider.
    It can be a single integer or a list of integers, in which case the metric will be computed for each value of k.

    The NDCG considers the position of relevant items in a ranked list of predictions.

    For a user u, the discounted cumulative gain at cutoff k is

    $DCG@k(u) = \\sum_{i=1}^{k} \\frac{\\mathbf{1}\\{\\text{pred}_i \\in \\text{Rel}(u)\\}}{\\log_2(i+1)}$

    where $\\mathbf{1}\\{\\cdot\\}$ is the indicator function and

    $\\text{Rel}(u)$ is the set of relevant items for user u.

    The ideal discounted cumulative gain is

    $IDCG@k = \\sum_{i=1}^{k} \\frac{1}{\\log_2(i+1)}$

    The normalized score is

    $NDCG@k(u) = \\frac{DCG@k(u)}{IDCG@k}$

    Finally, the reported score is averaged over all users:

    $\\text{NDCG@k} = \\frac{1}{|U|} \\sum_{u \\in U} NDCG@k(u)$

    Args:
        k (int | list[int]): The number of top predictions to consider.
    """
    super().__init__(k)

calculate(predictions: DataFrame, test: DataFrame) -> MetricResult

Computes the Normalized Discounted Cumulative Gain (NDCG). Considers the top-k predictions for one or multiple k values.

Parameters:

Name Type Description Default
predictions DataFrame

Contains the top k predictions for one or more users.

required
test DataFrame

Contains the ground truth relevant items for one or more users.

required

Returns:

Name Type Description
MetricResult MetricResult

The computed NDCG scores for each value k. If multiple users are provided, the scores are averaged.

Source code in src\omnirec\metrics\ranking.py
def calculate(self, predictions: DataFrame, test: DataFrame) -> MetricResult:
    """Computes the Normalized Discounted Cumulative Gain (NDCG). Considers the top-k predictions for one or multiple k values.

    Args:
        predictions (DataFrame): Contains the top k predictions for one or more users.
        test (DataFrame): Contains the ground truth relevant items for one or more users.

    Returns:
        MetricResult: The computed NDCG scores for each value k. If multiple users are provided, the scores are averaged.
    """
    top_k_dict = self.make_topk_dict(predictions)

    discounted_gain_per_k = np.array(
        [1 / np.log2(i + 1) for i in range(1, max(self._k_list) + 1)]
    )
    ideal_discounted_gain_per_k = [
        discounted_gain_per_k[: ind + 1].sum()
        for ind in range(len(discounted_gain_per_k))
    ]
    ndcg_per_user_per_k: dict[int, list] = {}
    for user, (pred, _) in top_k_dict.items():
        positive_test_interactions = test["item"][test["user"] == user].to_numpy()
        hits = np.isin(pred[: max(self._k_list)], positive_test_interactions)
        user_dcg = np.where(hits, discounted_gain_per_k[: len(hits)], 0)
        for k in self._k_list:
            user_ndcg = user_dcg[:k].sum() / ideal_discounted_gain_per_k[k - 1]
            ndcg_per_user_per_k.setdefault(k, []).append(user_ndcg)

    scores: list[float] = [
        float(sum(v)) / len(v) for v in ndcg_per_user_per_k.values()
    ]
    scores_dict = {k: score for k, score in zip(self._k_list, scores)}
    return MetricResult(__class__.__name__, scores_dict)

omnirec.metrics.ranking.Recall(k: int | list[int])

Bases: RankingMetric

Calculates the average recall at k for one or multiple k values. Recall at k is defined as the proportion of relevant items that are found in the top-k recommendations.

It follows the formula:

\(Recall@k = \frac{1}{|U|} \sum_{u \in U} \frac{|\text{Rel}(u) \cap \text{Pred}_k(u)|}{\min(|\text{Rel}(u)|, k)}\)

where \(\text{Pred}_k(u)\) is the set of top-k predicted items for user u.

Parameters:

Name Type Description Default
k int | list[int]

The number of top recommendations to consider.

required
Source code in src\omnirec\metrics\ranking.py
def __init__(self, k: int | list[int]) -> None:
    """Calculates the average recall at k for one or multiple k values. Recall at k is defined as the proportion of relevant items that are found in the top-k recommendations.

    It follows the formula:

    $Recall@k = \\frac{1}{|U|} \\sum_{u \\in U} \\frac{|\\text{Rel}(u) \\cap \\text{Pred}_k(u)|}{\\min(|\\text{Rel}(u)|, k)}$

    where $\\text{Pred}_k(u)$ is the set of top-k predicted items for user u.

    Args:
        k (int | list[int]): The number of top recommendations to consider.
    """
    super().__init__(k)

calculate(predictions: DataFrame, test: DataFrame) -> MetricResult

Calculates the Recall metric. Considers the top-k predictions for one or multiple k values.

Parameters:

Name Type Description Default
predictions DataFrame

Contains the top k predictions for one or more users.

required
test DataFrame

Contains the ground truth relevant items for one or more users.

required

Returns:

Type Description
MetricResult

list[float]: The computed Recall scores for each value k. If multiple users are provided, the scores are averaged.

Source code in src\omnirec\metrics\ranking.py
def calculate(self, predictions: DataFrame, test: DataFrame) -> MetricResult:
    """Calculates the Recall metric. Considers the top-k predictions for one or multiple k values.

    Args:
        predictions (DataFrame): Contains the top k predictions for one or more users.
        test (DataFrame): Contains the ground truth relevant items for one or more users.

    Returns:
        list[float]: The computed Recall scores for each value k. If multiple users are provided, the scores are averaged.
    """
    top_k_dict = self.make_topk_dict(predictions)

    recall_per_user_per_k: dict[int, list] = {}
    for user, (pred, _) in top_k_dict.items():
        positive_test_interactions = test["item"][test["user"] == user].to_numpy()
        hits = np.isin(pred[: max(self._k_list)], positive_test_interactions)
        for k in self._k_list:
            user_recall = hits[:k].sum() / min(len(positive_test_interactions), k)
            recall_per_user_per_k.setdefault(k, []).append(user_recall)
    scores: list[float] = [
        float(sum(v)) / len(v) for v in recall_per_user_per_k.values()
    ]
    scores_dict = {k: score for k, score in zip(self._k_list, scores)}
    return MetricResult(__class__.__name__, scores_dict)

Prediction Metrics

omnirec.metrics.prediction.MAE()

Bases: PredictionMetric

Mean Absolute Error (MAE) metric. Calculates the average of the absolute differences between predicted and actual ratings, according to the formula: \(MAE = \frac{1}{n} \sum_{i=1}^{n} |y_i - \hat{y}_i|\)

Source code in src\omnirec\metrics\prediction.py
def __init__(self) -> None:
    """Mean Absolute Error (MAE) metric. Calculates the average of the absolute differences between predicted and actual ratings, according to the formula:
    $MAE = \\frac{1}{n} \\sum_{i=1}^{n} |y_i - \\hat{y}_i|$
    """
    super().__init__()

calculate(predictions: DataFrame, test: DataFrame) -> MetricResult

Calculates the MAE metric.

Parameters:

Name Type Description Default
predictions DataFrame

Contains the predicted ratings.

required
test DataFrame

Contains the ground truth ratings.

required

Returns:

Name Type Description
MetricResult MetricResult

Contains the name of the metric and the computed MAE value.

Source code in src\omnirec\metrics\prediction.py
def calculate(self, predictions: DataFrame, test: DataFrame) -> MetricResult:
    """Calculates the MAE metric.

    Args:
        predictions (DataFrame): Contains the predicted ratings.
        test (DataFrame): Contains the ground truth ratings.

    Returns:
        MetricResult: Contains the name of the metric and the computed MAE value.
    """
    merged = self.merge(predictions, test)
    mae = mean_absolute_error(merged["rating_test"], merged["rating_pred"])
    return MetricResult(__class__.__name__, mae)

omnirec.metrics.prediction.RMSE()

Bases: PredictionMetric

Root Mean Squared Error (RMSE) metric. Calculates the square root of the average of the squared differences between predicted and actual ratings, according to the formula:

\(RMSE = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2}\)

Source code in src\omnirec\metrics\prediction.py
def __init__(self) -> None:
    """Root Mean Squared Error (RMSE) metric. Calculates the square root of the average of the squared differences between predicted and actual ratings, according to the formula:

    $RMSE = \\sqrt{\\frac{1}{n} \\sum_{i=1}^{n} (y_i - \\hat{y}_i)^2}$
    """
    super().__init__()

calculate(predictions: DataFrame, test: DataFrame) -> MetricResult

Calculate the RMSE metric.

Parameters:

Name Type Description Default
predictions DataFrame

description

required
test DataFrame

description

required

Returns:

Name Type Description
MetricResult MetricResult

Contains the name of the metric and the computed RMSE value.

Source code in src\omnirec\metrics\prediction.py
def calculate(self, predictions: DataFrame, test: DataFrame) -> MetricResult:
    """Calculate the RMSE metric.

    Args:
        predictions (DataFrame): _description_
        test (DataFrame): _description_

    Returns:
        MetricResult: Contains the name of the metric and the computed RMSE value.
    """
    merged = self.merge(predictions, test)
    rmse = root_mean_squared_error(merged["rating_test"], merged["rating_pred"])
    return MetricResult(__class__.__name__, rmse)

Experiment Planning

omnirec.runner.plan.ExperimentPlan(plan_name: Optional[str] = None)

Source code in src\omnirec\runner\plan.py
def __init__(self, plan_name: Optional[str] = None):
    self._name = plan_name
    self._config: dict[str, AlgorithmConfig] = {}

add_algorithm(algorithm: Algorithms | str, algorithm_config: Optional[AlgorithmConfig] = None, force=False)

Adds an algorithm to the experiment plan.

Parameters:

Name Type Description Default
algorithm Algorithms | str

The algorithm to add.

required
algorithm_config Optional[AlgorithmConfig]

The configuration for the algorithm. Algorithm config depends of the origin library of the algorithm. We refer to their documentation for details about the algorithm hyperparameters.

None
force bool

Whether to forcefully overwrite an existing algorithm config. Defaults to False.

False
Example
# Create a new experiment plan
plan = ExperimentPlan(plan_name="Example Plan")

# Define algorithm configuration based on the lenskit ItemKNNScorer parameters
lenskit_itemknn = {"max_nbrs": [10, 20], "min_nbrs": 5, "feedback": "implicit"}

# Add algorithm with configuration to the plan
plan.add_algorithm(Algorithms.ItemKNNScorer, lenskit_itemknn)
Source code in src\omnirec\runner\plan.py
def add_algorithm(
    self,
    algorithm: Algorithms | str,
    algorithm_config: Optional[AlgorithmConfig] = None,
    force=False,
):
    """Adds an algorithm to the experiment plan.

    Args:
        algorithm (Algorithms | str): The algorithm to add.
        algorithm_config (Optional[AlgorithmConfig], optional): The configuration for the algorithm. Algorithm config depends of the origin library of the algorithm. We refer to their documentation for details about the algorithm hyperparameters.
        force (bool, optional): Whether to forcefully overwrite an existing algorithm config. Defaults to False.

    Example:
        ```Python
        # Create a new experiment plan
        plan = ExperimentPlan(plan_name="Example Plan")

        # Define algorithm configuration based on the lenskit ItemKNNScorer parameters
        lenskit_itemknn = {"max_nbrs": [10, 20], "min_nbrs": 5, "feedback": "implicit"}

        # Add algorithm with configuration to the plan
        plan.add_algorithm(Algorithms.ItemKNNScorer, lenskit_itemknn)
        ```    
    """
    if isinstance(algorithm, Algorithms):
        algorithm_name = algorithm.value
    else:
        algorithm_name = algorithm
    # TODO: Force option?
    if not algorithm_config:
        algorithm_config = {}
    if algorithm_name in self._config:
        logger.critical(
            f'Config for "{algorithm_name}" already exists! Use "force=True" to overwrite or update it using "update_algorithm_config()"'
        )
        sys.exit(1)

    self._config[algorithm_name] = algorithm_config

update_algorithm(algorithm_name: str, algorithm_config: AlgorithmConfig)

Updates the configuration for an existing algorithm in the experiment plan.

Parameters:

Name Type Description Default
algorithm_name str

The name of the algorithm to update.

required
algorithm_config AlgorithmConfig

The new configuration for the algorithm.

required
Source code in src\omnirec\runner\plan.py
def update_algorithm(self, algorithm_name: str, algorithm_config: AlgorithmConfig):
    """Updates the configuration for an existing algorithm in the experiment plan.

    Args:
        algorithm_name (str): The name of the algorithm to update.
        algorithm_config (AlgorithmConfig): The new configuration for the algorithm.
    """
    if algorithm_name not in self._config:
        self._config[algorithm_name] = algorithm_config
    else:
        self._config[algorithm_name].update(algorithm_config)

Runner Function

omnirec.util.run.run_omnirec(datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]], plan: ExperimentPlan, evaluator: Evaluator, slurm_script: Optional[PathLike | str] = None)

Run the OmniRec framework with the specified datasets, experiment plan, and evaluator.

Parameters:

Name Type Description Default
datasets RecSysDataSet[T] | Iterable[RecSysDataSet[T]]

The dataset(s) to use for the experiment.

required
plan ExperimentPlan

The experiment plan to follow.

required
evaluator Evaluator

The evaluator to use for the experiment.

required
slurm_script Optional[PathLike | str]

Path to a SLURM script used to schedule experiments on an HPC cluster. If not provided, the experiments are run locally in normal mode.

None
Source code in src\omnirec\util\run.py
def run_omnirec(
    datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]],
    plan: ExperimentPlan,
    evaluator: Evaluator,  # TODO: Make optional
    slurm_script: Optional[PathLike | str] = None
):
    """Run the OmniRec framework with the specified datasets, experiment plan, and evaluator.

    Args:
        datasets (RecSysDataSet[T] | Iterable[RecSysDataSet[T]]): The dataset(s) to use for the experiment.
        plan (ExperimentPlan): The experiment plan to follow.
        evaluator (Evaluator): The evaluator to use for the experiment.
        slurm_script (Optional[PathLike | str]): Path to a SLURM script used to schedule experiments
            on an HPC cluster. If not provided, the experiments are run locally in normal mode.
    """
    if slurm_script is not None:
        # TODO:
        raise NotImplementedError()

    c = Coordinator()
    c.run(datasets, plan, evaluator)

    for table in evaluator.get_tables():
        console = Console()
        console.print(table)

Coordinator Class

omnirec.runner.coordinator.Coordinator(checkpoint_dir: PathLike | str = Path('./checkpoints'), tmp_dir: Optional[PathLike | str] = None)

Initialize the Coordinator for orchestrating recommendation algorithm experiments. The Coordinator manages the execution of experiments across multiple datasets, algorithms, and configurations. It handles environment isolation, checkpointing, progress tracking, and communication with framework-specific runners.

Parameters:

Name Type Description Default
checkpoint_dir PathLike | str

Directory for storing persistent experiment data including model checkpoints, predictions, and progress files. Directory is created if it doesn't exist. Defaults to "./checkpoints".

Path('./checkpoints')
tmp_dir Optional[PathLike | str]

Directory for temporary files such as intermediate CSV exports. If None, a temporary directory is created automatically and cleaned up on exit. Defaults to None.

None
Note
  • Automatically registers default runners (LensKit, RecBole, RecPack) on initialization
  • Generates SSL certificates for secure RPC communication with runner subprocesses
  • The checkpoint directory structure is: checkpoint_dir/dataset-hash/config-hash/
Source code in src\omnirec\runner\coordinator.py
def __init__(
    self,
    checkpoint_dir: PathLike | str = Path("./checkpoints"),
    tmp_dir: Optional[PathLike | str] = None,
) -> None:
    """Initialize the Coordinator for orchestrating recommendation algorithm experiments.
    The Coordinator manages the execution of experiments across multiple datasets, algorithms,
    and configurations. It handles environment isolation, checkpointing, progress tracking,
    and communication with framework-specific runners.

    Args:
        checkpoint_dir (PathLike | str, optional): Directory for storing persistent experiment data
            including model checkpoints, predictions, and progress files. Directory is created if it
            doesn't exist. Defaults to "./checkpoints".
        tmp_dir (Optional[PathLike | str], optional): Directory for temporary files such as intermediate
            CSV exports. If None, a temporary directory is created automatically and cleaned up on exit.
            Defaults to None.

    Note:
        - Automatically registers default runners (LensKit, RecBole, RecPack) on initialization
        - Generates SSL certificates for secure RPC communication with runner subprocesses
        - The checkpoint directory structure is: `checkpoint_dir/dataset-hash/config-hash/`
    """
    self._checkpoint_dir = Path(checkpoint_dir)
    if tmp_dir:
        self._tmp_dir = Path(tmp_dir)
    else:
        self._tmp_dir_obj: Optional[tempfile.TemporaryDirectory[str]] = (
            tempfile.TemporaryDirectory()
        )
        self._tmp_dir = Path(self._tmp_dir_obj.name)

    self._out_reader: Optional[OutputReader] = None
    self._err_reader: Optional[OutputReader] = None

    self._register_default_runners()
    ensure_certs()

run(datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]], config: ExperimentPlan, evaluator: Evaluator) -> Evaluator

Execute recommendation algorithm experiments across datasets and configurations. Orchestrates the complete experiment lifecycle: environment setup, model training, prediction generation, and evaluation. Supports automatic checkpointing and resuming of interrupted experiments.

Parameters:

Name Type Description Default
datasets RecSysDataSet[T] | Iterable[RecSysDataSet[T]]

Single dataset or list of datasets to run experiments on. Datasets must contain either SplitData (train/val/test) or FoldedData (cross-validation folds). Use preprocessing steps to create these splits.

required
config ExperimentPlan

Experiment configuration specifying algorithms and their hyperparameters. Each algorithm in the plan will be executed with all specified parameter combinations.

required
evaluator Evaluator

Evaluator instance containing metrics to compute on predictions. Results are accumulated across all experiments and accessible via evaluator.get_tables().

required

Returns:

Name Type Description
Evaluator Evaluator

The same evaluator instance passed in, now containing results from all experiments. Use evaluator.get_tables() to retrieve formatted result tables.

Raises:

Type Description
SystemExit

If the experiment plan is empty or if runner/algorithm validation fails.

Note
  • Each algorithm runs in an isolated Python environment with framework-specific dependencies
  • Progress is checkpointed after each phase (Fit, Predict, Eval) for fault tolerance
  • Identical dataset/config combinations are cached and skipped automatically
  • For cross-validation (FoldedData), experiments run sequentially across all folds
  • Runner subprocesses are automatically started and terminated for each algorithm
Source code in src\omnirec\runner\coordinator.py
def run(
    self,
    datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]],
    config: ExperimentPlan,
    evaluator: Evaluator,  # TODO: Make optional
) -> Evaluator:
    """Execute recommendation algorithm experiments across datasets and configurations.
    Orchestrates the complete experiment lifecycle: environment setup, model training,
    prediction generation, and evaluation. Supports automatic checkpointing and resuming
    of interrupted experiments.

    Args:
        datasets (RecSysDataSet[T] | Iterable[RecSysDataSet[T]]): Single dataset or list of datasets
            to run experiments on. Datasets must contain either SplitData (train/val/test) or
            FoldedData (cross-validation folds). Use preprocessing steps to create these splits.
        config (ExperimentPlan): Experiment configuration specifying algorithms and their hyperparameters.
            Each algorithm in the plan will be executed with all specified parameter combinations.
        evaluator (Evaluator): Evaluator instance containing metrics to compute on predictions.
            Results are accumulated across all experiments and accessible via `evaluator.get_tables()`.

    Returns:
        Evaluator: The same evaluator instance passed in, now containing results from all experiments.
            Use `evaluator.get_tables()` to retrieve formatted result tables.

    Raises:
        SystemExit: If the experiment plan is empty or if runner/algorithm validation fails.

    Note:
        - Each algorithm runs in an isolated Python environment with framework-specific dependencies
        - Progress is checkpointed after each phase (Fit, Predict, Eval) for fault tolerance
        - Identical dataset/config combinations are cached and skipped automatically
        - For cross-validation (FoldedData), experiments run sequentially across all folds
        - Runner subprocesses are automatically started and terminated for each algorithm
    """
    # TODO: Force fit, pred, eval parameters to overwrite status tracker
    # TODO: Dataset Normalization stuff etc. beforehand
    exception_occurred = False

    if not isinstance(datasets, Iterable):
        datasets = [datasets]

    algorithm_configs = config._get_configs()
    if len(algorithm_configs) == 0:
        logger.critical(
            "Empty configuration. You have to add at least one experiment!"
        )
        sys.exit(1)

    self._evaluator = evaluator
    self._results_path = self._checkpoint_dir / "results.json"
    if self._results_path.exists():
        self._evaluator.load_results(self._results_path)

    for current_algo, current_config_list in algorithm_configs:
        try:
            host, port = self.start_runner(current_algo)

            logger.info("Connecting to runner...")
            conn = rpyc.ssl_connect(
                host,
                port,
                get_key_pth(Side.Client),
                get_cert_pth(Side.Client),
                config={"sync_request_timeout": 600},
            )
            root: RunnerService = conn.root

            for current_dataset in datasets:
                for current_config in current_config_list:
                    dataset_namehash = f"{current_dataset._meta.name}-{self.dataset_hash(current_dataset)[:8]}"
                    config_namehash = f"{current_algo}-{self.config_hash(current_algo, current_config)[:8]}"
                    current_checkpoint_dir = (
                        self._checkpoint_dir / dataset_namehash / config_namehash
                    )
                    current_checkpoint_dir.mkdir(parents=True, exist_ok=True)
                    logger.debug(f"Using checkpoint dir: {current_checkpoint_dir}")

                    current_tmp_dir = (
                        self._tmp_dir / dataset_namehash / config_namehash
                    )

                    current_tmp_dir.mkdir(parents=True, exist_ok=True)
                    logger.debug(f"Using tmp dir: {current_tmp_dir}")

                    progress = RunProgress.load_or_create(
                        self._checkpoint_dir, (dataset_namehash, config_namehash)
                    )

                    if isinstance(current_dataset._data, FoldedData):

                        def get_next_fold():
                            return progress.get_next_fold_or_init(
                                dataset_namehash, config_namehash
                            )

                        def reset_phase():
                            progress.reset_phase(dataset_namehash, config_namehash)

                        def advance_fold():
                            progress.advance_fold(dataset_namehash, config_namehash)

                        next_fold = get_next_fold()

                        for fold in range(
                            next_fold, len(current_dataset._data.folds)
                        ):
                            fold_data = current_dataset._data.folds[fold]

                            files = self.get_file_paths(
                                current_checkpoint_dir, current_tmp_dir, fold
                            )
                            util.splits_to_csv(files[:3], fold_data)

                            self.run_split(
                                root,
                                progress,
                                current_algo,
                                current_config,
                                current_dataset._meta.name,
                                dataset_namehash,
                                config_namehash,
                                *files,
                                fold,
                            )

                            advance_fold()
                            reset_phase()
                    elif isinstance(current_dataset._data, SplitData):
                        files = self.get_file_paths(
                            current_checkpoint_dir, current_tmp_dir
                        )
                        util.splits_to_csv(files[:3], current_dataset._data)

                        self.run_split(
                            root,
                            progress,
                            current_algo,
                            current_config,
                            current_dataset._meta.name,
                            dataset_namehash,
                            config_namehash,
                            *files,
                        )
                    else:
                        logger.critical(
                            "Invalid dataset variant. Dataset has to be either FoldedData or SplitData. Apply a datasplit beforehand"
                        )
                        sys.exit(1)

        except Exception:
            traceback.print_exc()
            exception_occurred = True
        finally:
            if exception_occurred:
                self.stop()
            else:
                self.stop(logger.info)
            # print(self._proc.returncode) # TODO: Handle bad return code?

    self._evaluator.save_results(self._results_path)
    return self._evaluator

Utility Functions

omnirec.util.util.set_random_state(random_state: int) -> None

Set the global random state for reproducibility.

Parameters:

Name Type Description Default
random_state int

The random state seed.

required
Source code in src\omnirec\util\util.py
def set_random_state(random_state: int) -> None:
    """Set the global random state for reproducibility.

    Args:
        random_state (int): The random state seed.
    """
    global _RANDOM_STATE
    _RANDOM_STATE = random_state

omnirec.util.util.get_random_state() -> int

Get the global random state for reproducibility.

Returns:

Name Type Description
int int

The current random state seed.

Source code in src\omnirec\util\util.py
def get_random_state() -> int:
    """Get the global random state for reproducibility.

    Returns:
        int: The current random state seed.
    """
    return _RANDOM_STATE