from __future__ import annotations import os from dataclasses import dataclass from typing import Any, Protocol from src.aws import mlflow as aws_mlflow from src.config import Config, MlflowMode class Tracker(Protocol): def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: ... def finalize_training_run( self, *, run_id: str | None, training_job_status: Any, ) -> str | None: ... @dataclass(frozen=True) class NoopTracker: def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: return None def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: return None @dataclass(frozen=True) class MlflowTracker: mlflow: Any tracking_uri: str experiment_name: str registered_model_name: str register_trained_models: bool @classmethod def from_config(cls, cfg: Config) -> Tracker: if cfg.mlflow.mode is MlflowMode.disabled: return NoopTracker() os.environ.setdefault("MLFLOW_SUPPRESS_PRINTING_URL_TO_STDOUT", "true") try: import mlflow except ImportError as e: raise RuntimeError( "MLflow is enabled in config but optional dependencies are not installed. " "Install with: qc-cli[mlflow]" ) from e tracking_server_name = cfg.effective_mlflow_tracking_server_name if not tracking_server_name: raise RuntimeError("MLflow tracking server name could not be resolved.") tracking_uri = aws_mlflow.get_tracking_server_arn( cfg.aws.region, cfg.aws.profile, tracking_server_name, ) mlflow.set_tracking_uri(tracking_uri) mlflow.set_experiment(cfg.mlflow.experiment_name) return cls( mlflow=mlflow, tracking_uri=tracking_uri, experiment_name=cfg.mlflow.experiment_name, registered_model_name=cfg.mlflow.registered_model_name, register_trained_models=cfg.mlflow.register_trained_models, ) def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: run = self.mlflow.start_run(run_name=training_job.job_name) run_id = str(run.info.run_id) params = { "aws.region": region, "aws.profile": profile, "sagemaker.role_arn": role_arn, "sagemaker.job_name": training_job.job_name, "sagemaker.training_image": training_job.image_uri, "sagemaker.instance_type": training_job.instance_type, "sagemaker.instance_count": training_job.instance_count, "sagemaker.s3_train_uri": training_job.s3_train_uri, "sagemaker.s3_output_path": training_job.s3_output_path, "sagemaker.entry_point": training_job.entry_point, "sagemaker.source_dir": training_job.source_dir, } self._log_params(params) self._log_params({f"hyperparameters.{key}": value for key, value in training_job.hyperparameters.items()}) self.mlflow.set_tags( { "qc_cli.stage": "prerelease", "qc_cli.command": "train start", "sagemaker.job_name": training_job.job_name, } ) self.mlflow.end_run() return run_id def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: if not run_id: return None with self.mlflow.start_run(run_id=run_id): self._log_params( { "sagemaker.training_status": training_job_status.status, "sagemaker.created_at": training_job_status.created, "sagemaker.modified_at": training_job_status.modified, "sagemaker.model_artifacts": training_job_status.model_artifacts, "sagemaker.failure_reason": training_job_status.failure_reason, } ) self._log_final_metrics(training_job_status.raw) self.mlflow.set_tag("qc_cli.command", "train status") if training_job_status.status != "Completed" or not training_job_status.model_artifacts: self.mlflow.set_tag("qc_cli.training_terminal_status", training_job_status.status) return None if not self.register_trained_models: return None client = self.mlflow.tracking.MlflowClient() self._ensure_registered_model(client, self.registered_model_name) version = client.create_model_version( name=self.registered_model_name, source=training_job_status.model_artifacts, run_id=run_id, tags={ "qc_cli.stage": "prerelease", "sagemaker.job_name": training_job_status.name, }, ) version_number = str(version.version) self._set_alias(client, self.registered_model_name, "prerelease-latest", version_number) self.mlflow.set_tag("qc_cli.registered_model_name", self.registered_model_name) self.mlflow.set_tag("qc_cli.registered_model_version", version_number) return version_number def _log_params(self, params: dict[str, Any]) -> None: cleaned = {key: str(value) for key, value in params.items() if value is not None} if cleaned: self.mlflow.log_params(cleaned) def _log_final_metrics(self, training_job: dict[str, Any]) -> None: metrics = {} for metric in training_job.get("FinalMetricDataList", []): name = metric.get("MetricName") value = metric.get("Value") if name and value is not None: metrics[str(name)] = float(value) if metrics: self.mlflow.log_metrics(metrics) def _ensure_registered_model(self, client: Any, name: str) -> None: try: client.get_registered_model(name) except Exception: client.create_registered_model(name) def _set_alias(self, client: Any, name: str, alias: str, version: str) -> None: if hasattr(client, "set_registered_model_alias"): client.set_registered_model_alias(name, alias, version)