Skip to content

Coordinator Class

omnirec.runner.coordinator.Coordinator(checkpoint_dir: PathLike | str = Path('./checkpoints'), tmp_dir: Optional[PathLike | str] = None)

Initialize the Coordinator for orchestrating recommendation algorithm experiments. The Coordinator manages the execution of experiments across multiple datasets, algorithms, and configurations. It handles environment isolation, checkpointing, progress tracking, and communication with framework-specific runners.

Parameters:

Name Type Description Default
checkpoint_dir PathLike | str

Directory for storing persistent experiment data including model checkpoints, predictions, and progress files. Directory is created if it doesn't exist. Defaults to "./checkpoints".

Path('./checkpoints')
tmp_dir Optional[PathLike | str]

Directory for temporary files such as intermediate CSV exports. If None, a temporary directory is created automatically and cleaned up on exit. Defaults to None.

None
Note
  • Automatically registers default runners (LensKit, RecBole, RecPack) on initialization
  • Generates SSL certificates for secure RPC communication with runner subprocesses
  • The checkpoint directory structure is: checkpoint_dir/dataset-hash/config-hash/
Source code in src\omnirec\runner\coordinator.py
def __init__(
    self,
    checkpoint_dir: PathLike | str = Path("./checkpoints"),
    tmp_dir: Optional[PathLike | str] = None,
) -> None:
    """Initialize the Coordinator for orchestrating recommendation algorithm experiments.
    The Coordinator manages the execution of experiments across multiple datasets, algorithms,
    and configurations. It handles environment isolation, checkpointing, progress tracking,
    and communication with framework-specific runners.

    Args:
        checkpoint_dir (PathLike | str, optional): Directory for storing persistent experiment data
            including model checkpoints, predictions, and progress files. Directory is created if it
            doesn't exist. Defaults to "./checkpoints".
        tmp_dir (Optional[PathLike | str], optional): Directory for temporary files such as intermediate
            CSV exports. If None, a temporary directory is created automatically and cleaned up on exit.
            Defaults to None.

    Note:
        - Automatically registers default runners (LensKit, RecBole, RecPack) on initialization
        - Generates SSL certificates for secure RPC communication with runner subprocesses
        - The checkpoint directory structure is: `checkpoint_dir/dataset-hash/config-hash/`
    """
    self._checkpoint_dir = Path(checkpoint_dir)
    if tmp_dir:
        self._tmp_dir = Path(tmp_dir)
    else:
        self._tmp_dir_obj: Optional[tempfile.TemporaryDirectory[str]] = (
            tempfile.TemporaryDirectory()
        )
        self._tmp_dir = Path(self._tmp_dir_obj.name)

    self._out_reader: Optional[OutputReader] = None
    self._err_reader: Optional[OutputReader] = None
    self._root: Optional[_RunnerService] = None

    ensure_certs()

run(datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]], config: ExperimentPlan, evaluator: Evaluator) -> Evaluator

Execute recommendation algorithm experiments across datasets and configurations. Orchestrates the complete experiment lifecycle: environment setup, model training, prediction generation, and evaluation. Supports automatic checkpointing and resuming of interrupted experiments.

Parameters:

Name Type Description Default
datasets RecSysDataSet[T] | Iterable[RecSysDataSet[T]]

Single dataset or list of datasets to run experiments on. Datasets must contain either SplitData (train/val/test) or FoldedData (cross-validation folds). Use preprocessing steps to create these splits.

required
config ExperimentPlan

Experiment configuration specifying algorithms and their hyperparameters. Each algorithm in the plan will be executed with all specified parameter combinations.

required
evaluator Evaluator

Evaluator instance containing metrics to compute on predictions. Results are accumulated across all experiments and accessible via evaluator.get_tables().

required

Returns:

Name Type Description
Evaluator Evaluator

The same evaluator instance passed in, now containing results from all experiments. Use evaluator.get_tables() to retrieve formatted result tables.

Raises:

Type Description
SystemExit

If the experiment plan is empty or if runner/algorithm validation fails.

Note
  • Each algorithm runs in an isolated Python environment with framework-specific dependencies
  • Progress is checkpointed after each phase (Fit, Predict, Eval) for fault tolerance
  • Identical dataset/config combinations are cached and skipped automatically
  • For cross-validation (FoldedData), experiments run sequentially across all folds
  • Runner subprocesses are automatically started and terminated for each algorithm
Source code in src\omnirec\runner\coordinator.py
def run(
    self,
    datasets: RecSysDataSet[T] | Iterable[RecSysDataSet[T]],
    config: ExperimentPlan,
    evaluator: Evaluator,  # TODO: Make optional
) -> Evaluator:
    """Execute recommendation algorithm experiments across datasets and configurations.
    Orchestrates the complete experiment lifecycle: environment setup, model training,
    prediction generation, and evaluation. Supports automatic checkpointing and resuming
    of interrupted experiments.

    Args:
        datasets (RecSysDataSet[T] | Iterable[RecSysDataSet[T]]): Single dataset or list of datasets
            to run experiments on. Datasets must contain either SplitData (train/val/test) or
            FoldedData (cross-validation folds). Use preprocessing steps to create these splits.
        config (ExperimentPlan): Experiment configuration specifying algorithms and their hyperparameters.
            Each algorithm in the plan will be executed with all specified parameter combinations.
        evaluator (Evaluator): Evaluator instance containing metrics to compute on predictions.
            Results are accumulated across all experiments and accessible via `evaluator.get_tables()`.

    Returns:
        Evaluator: The same evaluator instance passed in, now containing results from all experiments.
            Use `evaluator.get_tables()` to retrieve formatted result tables.

    Raises:
        SystemExit: If the experiment plan is empty or if runner/algorithm validation fails.

    Note:
        - Each algorithm runs in an isolated Python environment with framework-specific dependencies
        - Progress is checkpointed after each phase (Fit, Predict, Eval) for fault tolerance
        - Identical dataset/config combinations are cached and skipped automatically
        - For cross-validation (FoldedData), experiments run sequentially across all folds
        - Runner subprocesses are automatically started and terminated for each algorithm
    """
    # TODO: Force fit, pred, eval parameters to overwrite status tracker
    # TODO: Dataset Normalization stuff etc. beforehand
    exception_occurred = False

    if not isinstance(datasets, Iterable):
        datasets = [datasets]

    algorithm_configs = config._get_configs()
    if len(algorithm_configs) == 0:
        logger.critical(
            "Empty configuration. You have to add at least one experiment!"
        )
        sys.exit(1)

    self._evaluator = evaluator
    self._results_path = self._checkpoint_dir / "results.json"
    if self._results_path.exists():
        self._evaluator.load_results(self._results_path)

    for current_algo, current_config_list in algorithm_configs:
        try:
            host, port = self.start_runner(current_algo)

            logger.info("Connecting to runner...")
            conn = rpyc.ssl_connect(
                host,
                port,
                get_key_pth(Side.Client),
                get_cert_pth(Side.Client),
            )
            root: _RunnerService = conn.root
            self._root = conn.root

            for current_dataset in datasets:
                for current_config in current_config_list:
                    dataset_namehash = f"{current_dataset._meta.name}-{self.dataset_hash(current_dataset)[:8]}"
                    config_namehash = f"{current_algo}-{self.config_hash(current_algo, current_config)[:8]}-{util.get_random_state()}"
                    current_checkpoint_dir = (
                        self._checkpoint_dir / dataset_namehash / config_namehash
                    )
                    current_checkpoint_dir.mkdir(parents=True, exist_ok=True)
                    logger.debug(f"Using checkpoint dir: {current_checkpoint_dir}")

                    current_tmp_dir = (
                        self._tmp_dir / dataset_namehash / config_namehash
                    )

                    current_tmp_dir.mkdir(parents=True, exist_ok=True)
                    logger.debug(f"Using tmp dir: {current_tmp_dir}")

                    progress = RunProgress.load_or_create(
                        self._checkpoint_dir,
                        (dataset_namehash, config_namehash, current_config),
                    )

                    if isinstance(current_dataset._data, FoldedData):

                        def get_next_fold():
                            return progress.get_next_fold_or_init(
                                dataset_namehash, config_namehash
                            )

                        def reset_phase():
                            progress.reset_phase(dataset_namehash, config_namehash)

                        def advance_fold():
                            progress.advance_fold(dataset_namehash, config_namehash)

                        next_fold = get_next_fold()

                        for fold in range(
                            next_fold, len(current_dataset._data.folds)
                        ):
                            fold_data = current_dataset._data.folds[fold]

                            files = self.get_file_paths(
                                current_checkpoint_dir, current_tmp_dir, fold
                            )
                            util.splits_to_csv(files[:3], fold_data)

                            self.run_split(
                                root,
                                progress,
                                current_algo,
                                current_config,
                                current_dataset._meta.name,
                                dataset_namehash,
                                config_namehash,
                                *files,
                                fold,
                            )

                            advance_fold()
                            reset_phase()
                    elif isinstance(current_dataset._data, SplitData):
                        files = self.get_file_paths(
                            current_checkpoint_dir, current_tmp_dir
                        )
                        util.splits_to_csv(files[:3], current_dataset._data)

                        self.run_split(
                            root,
                            progress,
                            current_algo,
                            current_config,
                            current_dataset._meta.name,
                            dataset_namehash,
                            config_namehash,
                            *files,
                        )
                    else:
                        logger.critical(
                            "Invalid dataset variant. Dataset has to be either FoldedData or SplitData. Apply a datasplit beforehand"
                        )
                        sys.exit(1)

        except Exception:
            traceback.print_exc()
            exception_occurred = True
        finally:
            if exception_occurred:
                self.stop()
            else:
                self.stop(logger.info)
            # print(self._proc.returncode) # TODO: Handle bad return code?

    self._evaluator.save_results(self._results_path)
    return self._evaluator