Skip to content

Tuning API

Tuning APIs connect pipelines, validators, search spaces, and optional experiment tracking.

Tuner is the main programmatic interface. It evaluates candidate pipeline configurations through a CrossValidator, using Optuna for parameter suggestions and XDFlow's pipeline contracts for cloning, caching, and nested parameter setting.

Tuner

Tuner(pipelines_to_tune: Pipeline | list[Pipeline], cv_strategy: CrossValidator, param_grid: dict[str, dict[str, dict[str, Any]]], initial_data_container: DataContainer, sampler: BaseSampler | None = None, pruner: BasePruner | None = None, direction: str = 'maximize', verbose: int = 1, use_mlflow: bool = True, mlflow_experiment_name: str = '', mlflow_metadata: dict[str, Any] | None = None, log_artifacts: bool = True, random_seed: int = 0, use_cache: bool = True, verbose_transforms: bool = False, exclude_intertrial_from_scoring: bool | None = None)

Orchestrates hyperparameter tuning for pipelines and cross-validators.

The Tuner class orchestrates hyperparameter optimization using a "configuration-driven" approach. Users specify which pipelines and parameters to test, and the Tuner translates this into an Optuna study.

The underlying CrossValidator automatically optimizes pipeline execution by detecting and separating stateless/stateful components, providing both a simple API and efficient execution.

Features: - High-level wrapper around Optuna optimization - Support for multiple pipeline architectures - Flexible parameter space specification - Integration with CrossValidator for evaluation - Automatic pipeline optimization via CrossValidator - Comprehensive random seed management for reproducibility

Initialize Tuner with pipelines and search space.

Parameters:

Name Type Description Default
pipelines_to_tune Pipeline | list[Pipeline]

List of Pipeline objects to compare/tune. Initial parameter values from these instances will be used for the initial trials. Parameter values will be kept if not overwritten by the param_grid.

required
cv_strategy CrossValidator

CrossValidator instance for evaluation

required
param_grid dict[str, dict[str, dict[str, Any]]]

Nested dict defining parameter search spaces Format: {pipeline_name: {step_name: {param_name: space}}}

required
initial_data_container DataContainer

DataContainer with data for optimization

required
sampler BaseSampler | None

Optuna sampler for hyperparameter optimization

None
direction str

Direction to optimize ("maximize" or "minimize")

'maximize'
verbose int

Verbosity level (0, 1, or 2)

1
use_mlflow bool

Whether to use MLflow for experiment tracking

True
mlflow_experiment_name str

Name of the MLflow experiment

''
mlflow_metadata dict[str, Any] | None

Additional metadata to log to MLflow

None
random_seed int

Global random seed for reproducibility.

0
use_cache bool

Whether to use caching for the static part of the pipeline.

True
verbose_transforms bool

Whether to enable verbose logging in transforms during tuning

False
exclude_intertrial_from_scoring bool | None

Optional toggle that, when set, overrides the validator's exclude_intertrial_from_scoring flag so tuning scores ignore synthetic blanks.

None
Source code in xdflow/tuning/base.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(
    self,
    pipelines_to_tune: Pipeline | list[Pipeline],
    cv_strategy: CrossValidator,
    param_grid: dict[str, dict[str, dict[str, Any]]],
    initial_data_container: DataContainer,
    sampler: BaseSampler | None = None,
    pruner: optuna.pruners.BasePruner | None = None,
    direction: str = "maximize",
    verbose: int = 1,
    use_mlflow: bool = True,
    mlflow_experiment_name: str = "",
    mlflow_metadata: dict[str, Any] | None = None,
    log_artifacts: bool = True,
    random_seed: int = 0,
    use_cache: bool = True,
    verbose_transforms: bool = False,
    exclude_intertrial_from_scoring: bool | None = None,
):
    """
    Initialize Tuner with pipelines and search space.

    Args:
        pipelines_to_tune: List of Pipeline objects to compare/tune.
                          Initial parameter values from these instances will be used for the initial trials.
                          Parameter values will be kept if not overwritten by the param_grid.
        cv_strategy: CrossValidator instance for evaluation
        param_grid: Nested dict defining parameter search spaces
                   Format: {pipeline_name: {step_name: {param_name: space}}}
        initial_data_container: DataContainer with data for optimization
        sampler: Optuna sampler for hyperparameter optimization
        direction: Direction to optimize ("maximize" or "minimize")
        verbose: Verbosity level (0, 1, or 2)
        use_mlflow: Whether to use MLflow for experiment tracking
        mlflow_experiment_name: Name of the MLflow experiment
        mlflow_metadata: Additional metadata to log to MLflow
        random_seed: Global random seed for reproducibility.
        use_cache: Whether to use caching for the static part of the pipeline.
        verbose_transforms: Whether to enable verbose logging in transforms during tuning
        exclude_intertrial_from_scoring: Optional toggle that, when set, overrides the validator's
            exclude_intertrial_from_scoring flag so tuning scores ignore synthetic blanks.
    """
    if isinstance(pipelines_to_tune, Pipeline):
        pipelines_to_tune = [pipelines_to_tune]

    self.pipelines_to_tune = {p.name: p for p in pipelines_to_tune}
    self.cv_strategy = cv_strategy
    self.param_grid = param_grid
    self.initial_data = initial_data_container
    self.sampler = sampler or TPESampler()
    # Disable pruning by default; users can still supply a custom pruner
    self.pruner = pruner or optuna.pruners.NopPruner()
    self.direction = direction
    self.use_mlflow = use_mlflow
    self.mlflow_experiment_name = mlflow_experiment_name
    self.mlflow_metadata = mlflow_metadata or {}
    self.log_artifacts = log_artifacts
    self.verbose = verbose
    self.random_seed = random_seed
    self.use_cache = use_cache
    self.verbose_transforms = verbose_transforms
    self.exclude_intertrial_from_scoring = exclude_intertrial_from_scoring

    if self.use_cache:
        for pipeline in self.pipelines_to_tune.values():
            pipeline.use_cache = True

    if self.exclude_intertrial_from_scoring is not None:
        if not hasattr(self.cv_strategy, "exclude_intertrial_from_scoring"):
            raise AttributeError(
                f"{type(self.cv_strategy).__name__} does not expose 'exclude_intertrial_from_scoring'."
            )
        self.cv_strategy.exclude_intertrial_from_scoring = self.exclude_intertrial_from_scoring

    self._validate_param_grid()

    # MLflow run tracking
    self.current_run_id = None

    # set random seeds
    self._set_random_seeds()

    # set up mlflow
    if self.use_mlflow:
        self._setup_mlflow()

    if self.verbose == 0:
        self.set_low_verbosity()

tune

tune(n_trials: int = 50, show_progress_bar: bool = False, run_name: str | None = None) -> tuple[dict[str, Any], float]

Runs the hyperparameter tuning study.

The first trial will always use the original parameter values from the pipeline instances as the starting point, ensuring that the optimization can evaluate the baseline configuration.

Parameters:

Name Type Description Default
n_trials int

Number of optimization trials to run

50
show_progress_bar bool

Whether to show progress bar during optimization

False
run_name str | None

Custom name for the MLflow run

None

Returns:

Type Description
tuple[dict[str, Any], float]

Tuple of (best_parameters, best_score)

Source code in xdflow/tuning/base.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def tune(
    self,
    n_trials: int = 50,
    show_progress_bar: bool = False,
    run_name: str | None = None,
) -> tuple[dict[str, Any], float]:
    """
    Runs the hyperparameter tuning study.

    The first trial will always use the original parameter values from the
    pipeline instances as the starting point, ensuring that the optimization
    can evaluate the baseline configuration.

    Args:
        n_trials: Number of optimization trials to run
        show_progress_bar: Whether to show progress bar during optimization
        run_name: Custom name for the MLflow run

    Returns:
        Tuple of (best_parameters, best_score)
    """

    run_name = f"seed_{self.random_seed}__{run_name}" if run_name else f"seed_{self.random_seed}"

    if self.use_mlflow:
        with mlflow.start_run(run_name=run_name) as parent_run:
            self._log_initial_params()

            # Create callback after parent run is established
            mlflow_callback = self._create_mlflow_callback()
            self._tune(n_trials, show_progress_bar, callbacks=[mlflow_callback])
            self.current_run_id = parent_run.info.run_id
    else:
        self._tune(n_trials, show_progress_bar, callbacks=None)

    best_params = self.study.best_params
    best_score = self.study.best_value

    print("Best Parameters:", best_params)
    print("Best Score:", best_score)

    if self.use_mlflow:
        # Log additional metrics if there's an active run
        try:
            with mlflow.start_run(run_id=self.current_run_id):
                mlflow.log_params(best_params)
                mlflow.log_metric("best_val_score", best_score)
        except Exception as e:
            print(f"Warning: Could not log to MLflow: {e}")

    return best_params, best_score

prepare_pipeline_with_caching

prepare_pipeline_with_caching(pipeline: Pipeline) -> tuple[Pipeline, DataContainer]

Prepares for tuning by splitting pipelines and caching static parts.

Source code in xdflow/tuning/base.py
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def prepare_pipeline_with_caching(self, pipeline: Pipeline) -> tuple[Pipeline, DataContainer]:
    """
    Prepares for tuning by splitting pipelines and caching static parts.
    """
    pipeline_param_grid = self.param_grid.get(pipeline.name, {})
    split_index = self._find_split_index(pipeline, pipeline_param_grid)

    if split_index > 0:
        static_pipeline = Pipeline(
            name=f"{pipeline.name}_static", steps=pipeline.steps[:split_index], use_cache=True
        )
        # Execute the static part and cache the data
        data = static_pipeline.fit_transform(self.initial_data, verbose=self.verbose_transforms)
    else:
        # No static part, use initial data
        data = self.initial_data

    # The dynamic part of the pipeline
    dynamic_pipeline = Pipeline(name=f"{pipeline.name}_dynamic", steps=pipeline.steps[split_index:])
    # Mark this pipeline as the post-cache pipeline for clearer logging downstream
    dynamic_pipeline.is_post_cache_pipeline = True
    try:
        dynamic_pipeline.cached_prefix_step_names = [
            (step.name if hasattr(step, "name") else step[0]) for step in pipeline.steps[:split_index]
        ]
    except Exception:
        dynamic_pipeline.cached_prefix_step_names = []
    dynamic_pipeline.origin_pipeline_name = pipeline.name

    return dynamic_pipeline, data

get_best_pipeline

get_best_pipeline() -> Pipeline

Reconstructs the best pipeline from the optimization results.

Returns:

Type Description
Pipeline

Complete Pipeline object configured with best parameters

Raises:

Type Description
ValueError

If tuning hasn't been run yet

Source code in xdflow/tuning/base.py
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
def get_best_pipeline(self) -> Pipeline:
    """
    Reconstructs the best pipeline from the optimization results.

    Returns:
        Complete Pipeline object configured with best parameters

    Raises:
        ValueError: If tuning hasn't been run yet
    """
    if self.study is None:
        raise ValueError("Must run tune() before getting best pipeline")

    best_params = self.study.best_trial.params

    # Get the best pipeline name
    pipeline_name = best_params["pipeline"]
    best_pipeline = self.pipelines_to_tune[pipeline_name].clone()

    # Apply the best parameters to the complete pipeline
    # Strip the pipeline name prefix from parameter names and normalize legacy
    # 'choice_' segments introduced by older SwitchTransform naming.
    params_to_set = {}

    def _normalize_choice_segments(name: str) -> str:
        parts = name.split("__")
        normalized_parts = [p[7:] if p.startswith("choice_") else p for p in parts]
        return "__".join(normalized_parts)

    for param_name, param_value in best_params.items():
        if param_name == "pipeline":
            continue
        # Remove pipeline name prefix: "pipeline_name__step__param" ->
        # "step__param"
        if param_name.startswith(f"{pipeline_name}__"):
            stripped_name = param_name[len(f"{pipeline_name}__") :]
            normalized_name = _normalize_choice_segments(stripped_name)
            params_to_set[normalized_name] = param_value
        else:
            params_to_set[_normalize_choice_segments(param_name)] = param_value

    best_pipeline.set_params(**params_to_set)

    return best_pipeline

finalize_best_pipeline

finalize_best_pipeline(data_container: DataContainer | None = None, verbose: bool = False) -> Pipeline

Fit the best pipeline on the full dataset to produce a finalized model.

Parameters:

Name Type Description Default
data_container DataContainer | None

DataContainer to use for the final fit. Defaults to the initial data.

None
verbose bool

Whether to enable verbose logging in transforms during the final fit.

False

Returns:

Type Description
Pipeline

A fitted Pipeline ready for inference.

Source code in xdflow/tuning/base.py
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
def finalize_best_pipeline(
    self,
    data_container: DataContainer | None = None,
    verbose: bool = False,
) -> Pipeline:
    """
    Fit the best pipeline on the full dataset to produce a finalized model.

    Args:
        data_container: DataContainer to use for the final fit. Defaults to the initial data.
        verbose: Whether to enable verbose logging in transforms during the final fit.

    Returns:
        A fitted Pipeline ready for inference.
    """
    if self.study is None:
        raise ValueError("Must run tune() before finalizing the best pipeline")

    container = data_container or self.initial_data
    best_pipeline = self.get_best_pipeline()

    validator = copy.deepcopy(self.cv_strategy)
    validator.set_pipeline(best_pipeline)

    return validator.finalize_pipeline(container, verbose=verbose)

score_best_pipeline_on_holdout

score_best_pipeline_on_holdout(return_validator: bool = False)

Score the best pipeline on the holdout set.

Returns:

Type Description

Holdout test score for the best pipeline

Source code in xdflow/tuning/base.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
def score_best_pipeline_on_holdout(self, return_validator: bool = False):
    """
    Score the best pipeline on the holdout set.

    Returns:
        Holdout test score for the best pipeline
    """
    best_pipeline = self.get_best_pipeline()

    if self.use_cache:
        best_pipeline, initial_data = self.prepare_pipeline_with_caching(best_pipeline)
    else:
        initial_data = self.initial_data

    validator = copy.deepcopy(self.cv_strategy)
    validator.set_pipeline(best_pipeline)  # Assign the best complete pipeline
    score = validator.score_on_holdout(initial_data, verbose=self.verbose_transforms)

    if self.use_mlflow and self.current_run_id is not None:
        # Reopen the run to log holdout metrics
        with mlflow.start_run(run_id=self.current_run_id):
            mlflow.log_metric("test_score", score)

            # Log confusion matrix
            if self.log_artifacts:
                artifact_path = "confusion_matrix.png"
                validator.plot_confusion_matrix(
                    use_holdout=True, normalize=True, save_as=artifact_path, show_plot=False
                )
                mlflow.log_artifact(artifact_path, "plots")

    if return_validator:
        return score, validator
    else:
        return score

Helper Utilities

run_tuning_pipeline is a higher-level helper for running tuning over one or more prebuilt pipelines and returning finalized pipelines.

run_tuning_pipeline

run_tuning_pipeline(pipelines_to_tune: Pipeline | list[Pipeline], cv_strategy: CrossValidator, param_grid: dict[str, dict[str, dict[str, Any]]], initial_data_container: DataContainer, experiment_name: str | None = None, mlflow_metadata: dict[str, Any] | None = None, n_seeds: int = 1, n_trials: int = 10, plot_importances: bool = False, plot_combined_conf_matrix: bool = True, plot_each_seed_conf_matrix: bool = False, scoring_mask_func: Any | None = None, exclude_intertrial_from_scoring: bool | None = None, holdout_ids: Sequence[Any] | None = None, n_holdouts: int | None = 1, holdout_chunk_seed: int | None = 0, return_pipelines: bool = True, score_on_holdout: bool = True, log_trial_params: bool = False, log_artifacts: bool = True, **kwargs)

Run a tuning pipeline and return the finalized pipelines.

Parameters:

Name Type Description Default
pipelines_to_tune Pipeline | list[Pipeline]

The pipelines to tune.

required
cv_strategy CrossValidator

The cross-validator to use.

required
param_grid dict[str, dict[str, dict[str, Any]]]

The parameter grid to use.

required
initial_data_container DataContainer

The initial data container to use.

required
experiment_name str | None

The name of the experiment.

None
mlflow_metadata dict[str, Any] | None

The metadata to use for mlflow.

None
n_seeds int

The number of seeds to use.

1
n_trials int

The number of trials to use.

10
plot_importances bool

Whether to plot the importances.

False
plot_combined_conf_matrix bool

Whether to plot the combined confusion matrix.

True
plot_each_seed_conf_matrix bool

Whether to plot the confusion matrix for each seed.

False
scoring_mask_func Any | None

Optional function to compute scoring mask for container-aware scorers. Should have signature: (DataContainer) -> np.ndarray (boolean mask). Used to filter confusion matrices to match the scorer's logic.

None
exclude_intertrial_from_scoring bool | None

Optional toggle propagated to each validator so tuning ignores synthetic intertrial segments when scoring folds/holdouts.

None
holdout_ids Sequence[Any] | None

Optional sequence of IDs eligible for holdout (e.g., session IDs). When provided alongside n_holdouts, each seed samples n_holdouts unique IDs from this pool (without replacement within the sample) and assigns them to the validator's holdout attribute.

None
n_holdouts int | None

Number of holdout IDs to sample per seed. Required if holdout_ids is set.

1
holdout_chunk_seed int | None

Optional RNG seed controlling holdout sampling.

0
return_pipelines bool

Whether to return the finalized pipelines.

True
score_on_holdout bool

When False, skip the holdout eval step and rely on cross-validation scores instead (useful for CV-only tuning flows).

True
log_trial_params bool

When True, print every Optuna trial's parameter dictionary and score after tuning completes. Helpful for debugging the search space.

False
**kwargs

Additional keyword arguments to pass to the Tuner.

{}

Returns:

Type Description

The finalized pipelines.

Source code in xdflow/tuning/tuner_utils.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def run_tuning_pipeline(
    pipelines_to_tune: Pipeline | list[Pipeline],
    cv_strategy: CrossValidator,
    param_grid: dict[str, dict[str, dict[str, Any]]],
    initial_data_container: DataContainer,
    experiment_name: str | None = None,
    mlflow_metadata: dict[str, Any] | None = None,
    n_seeds: int = 1,
    n_trials: int = 10,
    plot_importances: bool = False,
    plot_combined_conf_matrix: bool = True,
    plot_each_seed_conf_matrix: bool = False,
    scoring_mask_func: Any | None = None,
    exclude_intertrial_from_scoring: bool | None = None,
    holdout_ids: Sequence[Any] | None = None,
    n_holdouts: int | None = 1,
    holdout_chunk_seed: int | None = 0,
    return_pipelines: bool = True,
    score_on_holdout: bool = True,
    log_trial_params: bool = False,
    log_artifacts: bool = True,
    **kwargs,
):
    """
    Run a tuning pipeline and return the finalized pipelines.

    Args:
        pipelines_to_tune: The pipelines to tune.
        cv_strategy: The cross-validator to use.
        param_grid: The parameter grid to use.
        initial_data_container: The initial data container to use.
        experiment_name: The name of the experiment.
        mlflow_metadata: The metadata to use for mlflow.
        n_seeds: The number of seeds to use.
        n_trials: The number of trials to use.
        plot_importances: Whether to plot the importances.
        plot_combined_conf_matrix: Whether to plot the combined confusion matrix.
        plot_each_seed_conf_matrix: Whether to plot the confusion matrix for each seed.
        scoring_mask_func: Optional function to compute scoring mask for container-aware scorers.
                          Should have signature: (DataContainer) -> np.ndarray (boolean mask).
                          Used to filter confusion matrices to match the scorer's logic.
        exclude_intertrial_from_scoring: Optional toggle propagated to each validator so tuning ignores
            synthetic intertrial segments when scoring folds/holdouts.
        holdout_ids: Optional sequence of IDs eligible for holdout (e.g., session IDs).
            When provided alongside n_holdouts, each seed samples `n_holdouts`
            unique IDs from this pool (without replacement within the sample) and
            assigns them to the validator's holdout attribute.
        n_holdouts: Number of holdout IDs to sample per seed. Required if holdout_ids is set.
        holdout_chunk_seed: Optional RNG seed controlling holdout sampling.
        return_pipelines: Whether to return the finalized pipelines.
        score_on_holdout: When False, skip the holdout eval step and rely on cross-validation
            scores instead (useful for CV-only tuning flows).
        log_trial_params: When True, print every Optuna trial's parameter dictionary and score
            after tuning completes. Helpful for debugging the search space.
        **kwargs: Additional keyword arguments to pass to the Tuner.

    Returns:
        The finalized pipelines.
    """
    cms = []
    test_scores = []
    test_trues = []
    finalized_pipelines = []
    class_labels: list[Any] | None = None
    is_classification = None  # Will be set in first iteration
    metric_name = None  # For regression tasks

    holdout_pool: list[Any] | None = None
    holdout_sampler: random.Random | None = None
    holdout_assignment: list[Any] | None = None
    holdout_pointer = 0
    if holdout_ids is not None:
        if n_holdouts is None:
            raise ValueError("n_holdouts must be provided when holdout_ids is set.")
        holdout_pool = list(holdout_ids)
        if len(holdout_pool) < n_holdouts:
            raise ValueError(f"Requested n_holdouts={n_holdouts} but only {len(holdout_pool)} holdout_ids provided.")
        holdout_sampler = random.Random(holdout_chunk_seed)
        holdout_assignment = holdout_pool[:]
        holdout_sampler.shuffle(holdout_assignment)
        total_needed = n_seeds * n_holdouts
        if len(holdout_assignment) < total_needed:
            raise ValueError(
                f"Need {total_needed} unique holdout IDs for {n_seeds} seeds "
                f"(n_holdouts={n_holdouts}) but only {len(holdout_assignment)} provided."
            )
        warnings.warn(
            f"Randomly assigning {n_holdouts} holdout IDs for each seed. Current holdout_ids will be overwritten."
        )

    def _assign_holdout_ids(validator: CrossValidator, holdout_ids: list[Any]) -> None:
        """Assign holdout IDs to the validator using common attribute names."""
        if hasattr(validator, "test_group_ids"):
            cast(Any, validator).test_group_ids = holdout_ids
        elif hasattr(validator, "test_session_ids"):
            cast(Any, validator).test_session_ids = holdout_ids
        elif hasattr(validator, "test_animal_ids"):
            cast(Any, validator).test_animal_ids = holdout_ids
        else:
            raise AttributeError("Validator does not expose a test_* attribute to assign holdout IDs.")

    def _resolve_class_labels(final_predictor: Any) -> list[Any]:
        """Resolve class labels from a predictor or its underlying estimator."""
        if final_predictor is None:
            raise ValueError("No predictive_transform found on the pipeline; cannot resolve class labels.")
        if getattr(final_predictor, "encoder", None) is not None:
            return list(final_predictor.encoder.classes_)
        estimator = getattr(final_predictor, "estimator", None)
        if estimator is None or not hasattr(estimator, "classes_"):
            raise ValueError(
                "Class labels are unavailable. Provide an encoder on the final predictor or use an estimator "
                "exposing classes_."
            )
        return list(estimator.classes_)

    for seed in range(n_seeds):
        print(f"Tuning with seed {seed}")
        cv_for_seed = copy.deepcopy(cv_strategy)
        validator = None
        if holdout_pool is not None:
            if holdout_assignment is None:
                raise RuntimeError("holdout_assignment was not initialized.")
            if n_holdouts is None:
                raise RuntimeError("n_holdouts was not initialized.")
            sampled_ids = holdout_assignment[holdout_pointer : holdout_pointer + n_holdouts]
            holdout_pointer += n_holdouts
            _assign_holdout_ids(cv_for_seed, sampled_ids)

        tuner = Tuner(
            pipelines_to_tune=pipelines_to_tune,
            cv_strategy=cv_for_seed,
            param_grid=param_grid,
            initial_data_container=initial_data_container,
            mlflow_experiment_name=experiment_name,
            mlflow_metadata=mlflow_metadata,
            random_seed=seed,
            log_artifacts=log_artifacts,
            **kwargs,
            exclude_intertrial_from_scoring=exclude_intertrial_from_scoring,
        )
        tuner.tune(n_trials=n_trials, show_progress_bar=True)

        if log_trial_params:
            print("Optuna trial parameters:")
            for trial in tuner.study.trials:
                status = getattr(trial.state, "name", str(trial.state))
                value = trial.value
                print(f"  Trial {trial.number} [{status}] value={value}: {trial.params}")

        # Get best pipeline and score
        if score_on_holdout:
            holdout_score, validator = tuner.score_best_pipeline_on_holdout(return_validator=True)
            holdout_trues = validator.holdout_true_labels_

            # Compute scoring mask if provided (for container-aware scorers)
            if scoring_mask_func is not None:
                validator.compute_holdout_scoring_mask(scoring_mask_func)
                # Filter holdout_trues to match the scoring mask
                if validator.holdout_scoring_mask_ is not None:
                    holdout_trues = holdout_trues[validator.holdout_scoring_mask_]

            final_predictor = validator.pipeline.predictive_transform
            if final_predictor is None:
                raise ValueError("No predictive_transform found on the pipeline; cannot score on holdout.")
            is_classification = final_predictor.is_classifier

            if is_classification:
                # Classification: get confusion matrix and labels
                conf_matrix = validator.holdout_confusion_matrix_normalized_

                class_labels = _resolve_class_labels(final_predictor)

                cms.append(conf_matrix)
                print(f"Holdout F1 score: {holdout_score}")
            else:
                # Regression: no confusion matrix
                conf_matrix = None
                metric_name = validator.metric_name_
                print(f"Holdout {metric_name.upper()}: {holdout_score:.4f}")

            score = holdout_score
            trues = holdout_trues
        else:
            best_pipeline = tuner.get_best_pipeline()
            final_predictor = best_pipeline.predictive_transform
            if final_predictor is None:
                raise ValueError("No predictive_transform found on the pipeline; cannot score cross-validation.")
            is_classification = final_predictor.is_classifier
            metric_name = cv_for_seed.metric_name_

            needs_cv_eval = is_classification and (plot_each_seed_conf_matrix or plot_combined_conf_matrix)
            if needs_cv_eval:
                validator = copy.deepcopy(cv_for_seed)
                validator.set_pipeline(best_pipeline)
                cv_score = validator.cross_validate(initial_data_container, verbose=validator.verbose)
                cv_trues = np.concatenate(validator.true_labels_) if validator.true_labels_ else np.array([])
                conf_matrix = validator.oof_confusion_matrix_normalized_
                class_labels = _resolve_class_labels(final_predictor)
                cms.append(conf_matrix)
            else:
                cv_score = tuner.study.best_value
                cv_trues = np.array([])
                conf_matrix = None
            score = cv_score
            trues = cv_trues
            print(f"Cross-validation {metric_name.upper()} (no holdout): {cv_score:.4f}")

        # delete validator
        if validator is not None:
            del validator
            gc.collect()

        # log
        test_scores.append(score)  # Name kept for backward compat, but contains regression scores too
        test_trues.append(trues)

        if plot_each_seed_conf_matrix and is_classification and conf_matrix is not None:
            if class_labels is None:
                raise RuntimeError("Class labels were not resolved for classification plotting.")
            labels = class_labels
            source = "holdout" if score_on_holdout else "cv"
            plot_confusion_matrix(
                conf_matrix,
                labels,
                title=f"Confusion matrix ({source}) for seed {seed}, F1 score: {score:.4f}",
                test_trues=trues,
                ylabels=labels,
                xlabels=labels,
            )

        if plot_importances:
            plot_tune_importances(tuner.study)

        if return_pipelines:
            finalized_pipeline = tuner.finalize_best_pipeline(verbose=tuner.verbose_transforms)
            finalized_pipelines.append(finalized_pipeline)

    # Print average score and standard error across all seeds
    if len(test_scores) > 0:
        mean_score = np.mean(test_scores)
        std_error_score = np.std(test_scores, ddof=1) / np.sqrt(len(test_scores)) if len(test_scores) > 1 else 0.0

        # Use appropriate metric name (F1 for classification, or detected metric for regression)
        if is_classification:
            print(f"\nAverage F1 score across {len(test_scores)} seed(s): {mean_score:.4f} ± {std_error_score:.4f}")
        else:
            # metric_name from last validator (all should be same)
            metric_label = metric_name or "score"
            print(
                f"\nAverage {metric_label.upper()} across {len(test_scores)} seed(s): {mean_score:.4f} ± {std_error_score:.4f}"
            )

    # Only plot confusion matrix for classification tasks
    if plot_combined_conf_matrix and class_labels is not None and is_classification and len(cms) > 0:
        source = "holdout" if score_on_holdout else "cv"
        plot_combined_confusion_matrix(
            cms,
            class_labels,
            test_scores,
            test_trues=test_trues,
            title=f"{experiment_name} ({source}), average score: {mean_score:.4f}",
            want_plot=True,
        )

    if return_pipelines:
        return finalized_pipelines
    else:
        return None