wip mlflow implementation
This commit is contained in:
163
src/tracking/mlflow.py
Normal file
163
src/tracking/mlflow.py
Normal file
@@ -0,0 +1,163 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Protocol
|
||||
|
||||
from src.aws import mlflow as aws_mlflow
|
||||
from src.config import Config, MlflowMode
|
||||
|
||||
|
||||
class Tracker(Protocol):
|
||||
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: ...
|
||||
|
||||
def finalize_training_run(
|
||||
self,
|
||||
*,
|
||||
run_id: str | None,
|
||||
training_job_status: Any,
|
||||
) -> str | None: ...
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class NoopTracker:
|
||||
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
|
||||
return None
|
||||
|
||||
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MlflowTracker:
|
||||
mlflow: Any
|
||||
tracking_uri: str
|
||||
experiment_name: str
|
||||
registered_model_name: str
|
||||
register_trained_models: bool
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, cfg: Config) -> Tracker:
|
||||
if cfg.mlflow.mode is MlflowMode.disabled:
|
||||
return NoopTracker()
|
||||
|
||||
try:
|
||||
import mlflow
|
||||
except ImportError as e:
|
||||
raise RuntimeError(
|
||||
"MLflow is enabled in config but optional dependencies are not installed. "
|
||||
"Install with: qc-cli[mlflow]"
|
||||
) from e
|
||||
|
||||
if not cfg.mlflow.tracking_server_name:
|
||||
raise RuntimeError("mlflow.tracking_server_name is required when MLflow is enabled.")
|
||||
|
||||
tracking_uri = aws_mlflow.get_tracking_server_arn(
|
||||
cfg.aws.region,
|
||||
cfg.aws.profile,
|
||||
cfg.mlflow.tracking_server_name,
|
||||
)
|
||||
mlflow.set_tracking_uri(tracking_uri)
|
||||
mlflow.set_experiment(cfg.mlflow.experiment_name)
|
||||
|
||||
return cls(
|
||||
mlflow=mlflow,
|
||||
tracking_uri=tracking_uri,
|
||||
experiment_name=cfg.mlflow.experiment_name,
|
||||
registered_model_name=cfg.mlflow.registered_model_name,
|
||||
register_trained_models=cfg.mlflow.register_trained_models,
|
||||
)
|
||||
|
||||
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
|
||||
run = self.mlflow.start_run(run_name=training_job.job_name)
|
||||
run_id = str(run.info.run_id)
|
||||
|
||||
params = {
|
||||
"aws.region": region,
|
||||
"aws.profile": profile,
|
||||
"sagemaker.role_arn": role_arn,
|
||||
"sagemaker.job_name": training_job.job_name,
|
||||
"sagemaker.training_image": training_job.image_uri,
|
||||
"sagemaker.instance_type": training_job.instance_type,
|
||||
"sagemaker.instance_count": training_job.instance_count,
|
||||
"sagemaker.s3_train_uri": training_job.s3_train_uri,
|
||||
"sagemaker.s3_output_path": training_job.s3_output_path,
|
||||
"sagemaker.entry_point": training_job.entry_point,
|
||||
"sagemaker.source_dir": training_job.source_dir,
|
||||
}
|
||||
self._log_params(params)
|
||||
self._log_params({f"hyperparameters.{key}": value for key, value in training_job.hyperparameters.items()})
|
||||
self.mlflow.set_tags(
|
||||
{
|
||||
"qc_cli.stage": "prerelease",
|
||||
"qc_cli.command": "train start",
|
||||
"sagemaker.job_name": training_job.job_name,
|
||||
}
|
||||
)
|
||||
self.mlflow.end_run()
|
||||
return run_id
|
||||
|
||||
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
|
||||
if not run_id:
|
||||
return None
|
||||
|
||||
with self.mlflow.start_run(run_id=run_id):
|
||||
self._log_params(
|
||||
{
|
||||
"sagemaker.training_status": training_job_status.status,
|
||||
"sagemaker.created_at": training_job_status.created,
|
||||
"sagemaker.modified_at": training_job_status.modified,
|
||||
"sagemaker.model_artifacts": training_job_status.model_artifacts,
|
||||
"sagemaker.failure_reason": training_job_status.failure_reason,
|
||||
}
|
||||
)
|
||||
self._log_final_metrics(training_job_status.raw)
|
||||
self.mlflow.set_tag("qc_cli.command", "train status")
|
||||
|
||||
if training_job_status.status != "Completed" or not training_job_status.model_artifacts:
|
||||
self.mlflow.set_tag("qc_cli.training_terminal_status", training_job_status.status)
|
||||
return None
|
||||
|
||||
if not self.register_trained_models:
|
||||
return None
|
||||
|
||||
client = self.mlflow.tracking.MlflowClient()
|
||||
self._ensure_registered_model(client, self.registered_model_name)
|
||||
version = client.create_model_version(
|
||||
name=self.registered_model_name,
|
||||
source=training_job_status.model_artifacts,
|
||||
run_id=run_id,
|
||||
tags={
|
||||
"qc_cli.stage": "prerelease",
|
||||
"sagemaker.job_name": training_job_status.name,
|
||||
},
|
||||
)
|
||||
version_number = str(version.version)
|
||||
self._set_alias(client, self.registered_model_name, "prerelease-latest", version_number)
|
||||
self.mlflow.set_tag("qc_cli.registered_model_name", self.registered_model_name)
|
||||
self.mlflow.set_tag("qc_cli.registered_model_version", version_number)
|
||||
return version_number
|
||||
|
||||
def _log_params(self, params: dict[str, Any]) -> None:
|
||||
cleaned = {key: str(value) for key, value in params.items() if value is not None}
|
||||
if cleaned:
|
||||
self.mlflow.log_params(cleaned)
|
||||
|
||||
def _log_final_metrics(self, training_job: dict[str, Any]) -> None:
|
||||
metrics = {}
|
||||
for metric in training_job.get("FinalMetricDataList", []):
|
||||
name = metric.get("MetricName")
|
||||
value = metric.get("Value")
|
||||
if name and value is not None:
|
||||
metrics[str(name)] = float(value)
|
||||
if metrics:
|
||||
self.mlflow.log_metrics(metrics)
|
||||
|
||||
def _ensure_registered_model(self, client: Any, name: str) -> None:
|
||||
try:
|
||||
client.get_registered_model(name)
|
||||
except Exception:
|
||||
client.create_registered_model(name)
|
||||
|
||||
def _set_alias(self, client: Any, name: str, alias: str, version: str) -> None:
|
||||
if hasattr(client, "set_registered_model_alias"):
|
||||
client.set_registered_model_alias(name, alias, version)
|
||||
Reference in New Issue
Block a user