Skip to content

Preprocessing Pipeline

omnirec.preprocess.base.Preprocessor()

Bases: ABC, Generic[T, U]

Source code in src\omnirec\preprocess\base.py
def __init__(self) -> None:
    self._trace_component: str
    self._trace_params: dict[str, Any]

    if not hasattr(self, "_trace_component"):
        self._trace_component = type(self).__name__
    if not hasattr(self, "_trace_params"):
        self._trace_params = {}
    super().__init__()

process(dataset: RecSysDataSet[T]) -> RecSysDataSet[U]

Processes a dataset and records execution metadata in the lineage.

This is the public entry point for running a preprocessor. It delegates the actual transformation to _process() and wraps it with shared logic such as timing, dataset shape capture, and trace creation.

Parameters:

Name Type Description Default
dataset RecSysDataSet[T]

The dataset to process.

required

Returns:

Type Description
RecSysDataSet[U]

RecSysDataSet[U]: The processed dataset with an appended trace entry.

Source code in src\omnirec\preprocess\base.py
@final
def process(self, dataset: RecSysDataSet[T]) -> RecSysDataSet[U]:
    """Processes a dataset and records execution metadata in the lineage.

    This is the public entry point for running a preprocessor. It delegates
    the actual transformation to ``_process()`` and wraps it with shared
    logic such as timing, dataset shape capture, and trace creation.

    Args:
        dataset (RecSysDataSet[T]): The dataset to process.

    Returns:
        RecSysDataSet[U]: The processed dataset with an appended trace entry.
    """
    before_rows = dataset.num_interactions()
    before_columns = dataset.num_columns()
    start_time = perf_counter()
    new_ds = self._process(dataset)
    runtime = perf_counter() - start_time

    trace = Trace(
        component=self._trace_component,
        params=self._trace_params.copy(),
        executed_at=datetime.now(UTC),
        runtime=runtime,
        before_rows=before_rows,
        before_columns=before_columns,
        after_rows=new_ds.num_interactions(),
        after_columns=new_ds.num_columns(),
    )
    new_ds._lineage.append(trace)
    return new_ds

_process(dataset: RecSysDataSet[T]) -> RecSysDataSet[U] abstractmethod

Implementation hook for transforming a dataset.

Subclasses should override this method with the preprocessing logic itself. Callers should use process() rather than invoking this method directly.

Parameters:

Name Type Description Default
dataset RecSysDataSet[T]

The dataset to transform.

required

Returns:

Type Description
RecSysDataSet[U]

RecSysDataSet[U]: The transformed dataset.

Source code in src\omnirec\preprocess\base.py
@abstractmethod
def _process(self, dataset: RecSysDataSet[T]) -> RecSysDataSet[U]:
    """Implementation hook for transforming a dataset.

    Subclasses should override this method with the preprocessing logic itself.
    Callers should use ``process()`` rather than invoking this method directly.

    Args:
        dataset (RecSysDataSet[T]): The dataset to transform.

    Returns:
        RecSysDataSet[U]: The transformed dataset.
    """
    pass

omnirec.preprocess.subsample.Subsample(sample_size: int | float)

Bases: Preprocessor[RawData, RawData]

Subsamples the dataset to a specified size.

Parameters:

Name Type Description Default
sample_size int | float

The size of the sample to draw from the dataset. int: The absolute number of interactions to include in the sample. float: The fraction of the dataset to include in the sample (between 0 and 1).

required
Source code in src\omnirec\preprocess\subsample.py
def __init__(self, sample_size: int | float) -> None:
    """Subsamples the dataset to a specified size.

    Args:
        sample_size (int | float): The size of the sample to draw from the dataset.
                                    int: The absolute number of interactions to include in the sample.
                                    float: The fraction of the dataset to include in the sample (between 0 and 1).
    """
    super().__init__()
    self.sample_size = sample_size

omnirec.preprocess.core_pruning.CorePruning(core: int)

Bases: Preprocessor[RawData, RawData]

Prune the dataset to the specified core. Core pruning with a threshold of e.g. 5 means that only users and items with at least 5 interactions are included in the pruned dataset.

Parameters:

Name Type Description Default
core int

The core threshold for pruning.

required
Source code in src\omnirec\preprocess\core_pruning.py
def __init__(self, core: int) -> None:
    """Prune the dataset to the specified core.
    Core pruning with a threshold of e.g. 5 means that only users and items with at least 5 interactions are included in the pruned dataset.

    Args:
        core (int): The core threshold for pruning.
    """
    super().__init__()
    self.core = core

omnirec.preprocess.feedback_conversion.MakeImplicit(threshold: int | float)

Bases: Preprocessor[RawData, RawData]

Converts explicit feedback to implicit feedback using the specified threshold.

Parameters:

Name Type Description Default
threshold int | float

The threshold for converting feedback. int: Used directly as the threshold, e.g. 3 -> only interactions with a rating of 3 or higher are included. float: Interpreted as a fraction of the maximum rating, e.g. 0.5 -> only interactions with a rating of at least 50% of the maximum rating are included.

required
Source code in src\omnirec\preprocess\feedback_conversion.py
def __init__(self, threshold: int | float) -> None:
    """Converts explicit feedback to implicit feedback using the specified threshold.

    Args:
        threshold (int | float): The threshold for converting feedback.
                                    int: Used directly as the threshold, e.g. 3 -> only interactions with a rating of 3 or higher are included.
                                    float: Interpreted as a fraction of the maximum rating, e.g. 0.5 -> only interactions with a rating of at least 50% of the maximum rating are included.
    """
    super().__init__()
    self.threshold = threshold

Filtering

omnirec.preprocess.filter.TimeFilter(start: Optional[pd.Timestamp] = None, end: Optional[pd.Timestamp] = None)

Bases: Preprocessor[RawData, RawData]

Filters the interactions by a time range. Only interactions within the specified start and end timestamps are retained.

Parameters:

Name Type Description Default
start Optional[Timestamp]

The start timestamp for the filter. Defaults to None.

None
end Optional[Timestamp]

The end timestamp for the filter. Defaults to None.

None
Source code in src\omnirec\preprocess\filter.py
def __init__(
    self, start: Optional[pd.Timestamp] = None, end: Optional[pd.Timestamp] = None
) -> None:
    """Filters the interactions by a time range. Only interactions within the specified start and end timestamps are retained.

    Args:
        start (Optional[pd.Timestamp], optional): The start timestamp for the filter. Defaults to None.
        end (Optional[pd.Timestamp], optional): The end timestamp for the filter. Defaults to None.
    """
    super().__init__()
    self._start = start
    self._end = end

omnirec.preprocess.filter.RatingFilter(lower: Optional[int | float] = None, upper: Optional[int | float] = None)

Bases: Preprocessor[RawData, RawData]

Filters the interactions by rating values. Only interactions with ratings within the specified lower and upper bounds are retained.

Parameters:

Name Type Description Default
lower Optional[int | float]

The lower bound for the filter. Defaults to None.

None
upper Optional[int | float]

The upper bound for the filter. Defaults to None.

None
Source code in src\omnirec\preprocess\filter.py
def __init__(
    self, lower: Optional[int | float] = None, upper: Optional[int | float] = None
) -> None:
    """Filters the interactions by rating values. Only interactions with ratings within the specified lower and upper bounds are retained.

    Args:
        lower (Optional[int  |  float], optional): The lower bound for the filter. Defaults to None.
        upper (Optional[int  |  float], optional): The upper bound for the filter. Defaults to None.
    """
    super().__init__()
    self._lower = lower
    self._upper = upper

omnirec.preprocess.split.RandomCrossValidation(num_folds: int, validation_size: float | int)

Bases: DataSplit[RawData, FoldedData]

Applies random cross-validation to the dataset. Randomly splits the dataset into training, validation, and test sets for each fold.

Parameters:

Name Type Description Default
num_folds int

The number of folds to use for cross-validation.

required
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, num_folds: int, validation_size: float | int) -> None:
    """Applies random cross-validation to the dataset. Randomly splits the dataset into training, validation, and test sets for each fold.

    Args:
        num_folds (int): The number of folds to use for cross-validation.
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
    """
    super().__init__(validation_size)
    self._num_folds = num_folds

omnirec.preprocess.split.RandomHoldout(validation_size: float | int, test_size: float | int)

Bases: DataSplit[RawData, SplitData]

Applies a random holdout split to the dataset. Randomly splits the dataset into training, validation, and test sets.

Parameters:

Name Type Description Default
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
test_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the test split. int: The absolute number of interactions to include in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, validation_size: float | int, test_size: float | int) -> None:
    """Applies a random holdout split to the dataset. Randomly splits the dataset into training, validation, and test sets.

    Args:
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
        test_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the test split.
                                    int: The absolute number of interactions to include in the test split.
    """
    super().__init__(validation_size)
    self._test_size = test_size

omnirec.preprocess.split.UserCrossValidation(num_folds: int, validation_size: float | int)

Bases: DataSplit[RawData, FoldedData]

Applies user-based cross-validation to the dataset. Ensures that each user has interactions in the training, validation, and test sets in each fold.

Parameters:

Name Type Description Default
num_folds int

The number of folds to use for cross-validation.

required
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, num_folds: int, validation_size: float | int) -> None:
    """Applies user-based cross-validation to the dataset. Ensures that each user has interactions in the training, validation, and test sets in each fold.

    Args:
        num_folds (int): The number of folds to use for cross-validation.
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
    """
    super().__init__(validation_size)
    self._num_folds = num_folds

omnirec.preprocess.split.UserHoldout(validation_size: float | int, test_size: float | int)

Bases: DataSplit[RawData, SplitData]

Applies the user holdout split to the dataset. Ensures that each user has interactions in the training, validation, and test sets.

Parameters:

Name Type Description Default
validation_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the validation split. int: The absolute number of interactions to include in the validation split.

required
test_size float | int

float: The proportion (between 0 and 1) of the dataset to include in the test split. int: The absolute number of interactions to include in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(self, validation_size: float | int, test_size: float | int) -> None:
    """Applies the user holdout split to the dataset. Ensures that each user has interactions in the training, validation, and test sets.

    Args:
        validation_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the validation split.
                                        int: The absolute number of interactions to include in the validation split.
        test_size (float | int): float: The proportion (between 0 and 1) of the dataset to include in the test split.
                                    int: The absolute number of interactions to include in the test split.
    """
    super().__init__(validation_size)
    self._test_size = test_size

omnirec.preprocess.split.TimeBasedHoldout(validation: float | int | pd.Timestamp, test: float | int | pd.Timestamp)

Bases: DataSplit[RawData, SplitData]

Applies a time-based hold-out split on a dataset. Splits the dataset into a train, test and validation split based on the timestamp. Can either use proportions, absolute numbers or timestamps as cut-off criteria.

Parameters:

Name Type Description Default
validation float | int | Timestamp

float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the validation split. int: The absolute number of newest interactions to include in the validation split. pd.Timestamp: The timestamp to use as a cut-off for the validation split. Interactions after this timestamp (newer) are included in the validation split.

required
test float | int | Timestamp

float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the test split. int: The absolute number of newest interactions to include in the test split. pd.Timestamp: The timestamp to use as a cut-off for the test split. Interactions after this timestamp (newer) are included in the test split.

required
Source code in src\omnirec\preprocess\split.py
def __init__(
    self,
    validation: float | int | pd.Timestamp,
    test: float | int | pd.Timestamp,
) -> None:
    """Applies a time-based hold-out split on a dataset. Splits the dataset into a train, test and validation split based on the timestamp. Can either use proportions, absolute numbers or timestamps as cut-off criteria.

    Args:
        validation (float | int | pd.Timestamp): float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the validation split.
                                                int: The absolute number of newest interactions to include in the validation split.
                                                pd.Timestamp: The timestamp to use as a cut-off for the validation split. Interactions after this timestamp (newer) are included in the validation split.
        test (float | int | pd.Timestamp): float: The proportion (between 0 and 1) of newest interactions in the dataset to include in the test split.
                                            int: The absolute number of newest interactions to include in the test split.
                                            pd.Timestamp: The timestamp to use as a cut-off for the test split. Interactions after this timestamp (newer) are included in the test split.
    """
    super().__init__(0)

    if type(validation) is not type(test):
        self.logger.critical("Validation and test size must be the same type")
        sys.exit(1)

    self._valid_size = validation
    self._test_size = test

omnirec.preprocess.pipe.Pipe(*steps: Unpack[tuple[Unpack[Ts], Preprocessor[Any, T]]])

Bases: Generic[T]

Pipeline for automatically applying sequential preprocessing steps. Takes as input a sequence of Preprocessor objects. If process() is called, each step's process method is called in the order they were provided. Example:

    # Define preprocessing steps
    pipe = Pipe(
        Subsample(0.1),
        MakeImplicit(3),
        CorePruning(5),
        UserCrossValidation(5, 0.1),
    )

    # Apply the steps
    dataset = pipe.process(dataset)

Source code in src\omnirec\preprocess\pipe.py
def __init__(self, *steps: Unpack[tuple[Unpack[Ts], Preprocessor[Any, T]]]) -> None:
    """Pipeline for automatically applying sequential preprocessing steps. Takes as input a sequence of Preprocessor objects.
    If process() is called, each step's process method is called in the order they were provided.
    Example:
        ```Python
            # Define preprocessing steps
            pipe = Pipe(
                Subsample(0.1),
                MakeImplicit(3),
                CorePruning(5),
                UserCrossValidation(5, 0.1),
            )

            # Apply the steps
            dataset = pipe.process(dataset)
        ```
    """
    super().__init__()
    self._steps = steps