4 Commits

Author SHA1 Message Date
58681cef82 command to create presigned URL for MLFlow 2026-05-27 10:52:08 -04:00
e1c8d6574f omit server name when created with config 2026-05-27 10:23:53 -04:00
35d25d8967 Merge branch 'main' into ml-flow 2026-05-27 08:58:46 -04:00
b907a74525 wip mlflow implementation 2026-05-26 15:03:53 -04:00
13 changed files with 2281 additions and 38 deletions

View File

@@ -78,9 +78,13 @@ To provision an MLflow tracking server, set:
```yaml ```yaml
mlflow: mlflow:
mode: create mode: create
tracking_server_name: your-tracking-server-name experiment_name: qc-cli-training
registered_model_name: qc-cli-model
register_trained_models: true
``` ```
In `create` mode, the CLI manages the tracking server name from `infra.stack_name`; you do not need to set `tracking_server_name`.
To use an existing MLflow tracking server, set: To use an existing MLflow tracking server, set:
```yaml ```yaml
@@ -89,6 +93,22 @@ mlflow:
tracking_server_name: your-tracking-server-name tracking_server_name: your-tracking-server-name
``` ```
Install the optional MLflow dependencies before enabling MLflow:
```bash
uv sync --extra mlflow
```
When MLflow is enabled, `train start` creates an MLflow run for the SageMaker job. `train status` finalizes that run once the job reaches a terminal state and registers completed model artifacts as pre-release model versions using the `prerelease-latest` MLflow alias.
To open the managed SageMaker MLflow UI, request a fresh presigned URL:
```bash
qc-cli infra mlflow-url --config config.yaml
```
This works for `mode: create` and for `mode: existing` when the existing server is managed by Amazon SageMaker. In `create` mode, the command uses the CLI-managed tracking server name. In `existing` mode, it uses `mlflow.tracking_server_name`. If the existing MLflow server is external to SageMaker, open it with that server's own URL instead.
## Commands ## Commands
### `init` ### `init`
@@ -106,6 +126,7 @@ qc-cli infra setup Deploy the CDK stack
qc-cli infra setup --no-bootstrap Deploy without running CDK bootstrap qc-cli infra setup --no-bootstrap Deploy without running CDK bootstrap
qc-cli infra setup --cloudformation-execution-policy <arn> Set CDK bootstrap execution policy ARN qc-cli infra setup --cloudformation-execution-policy <arn> Set CDK bootstrap execution policy ARN
qc-cli infra status Show CDK stack/resource status qc-cli infra status Show CDK stack/resource status
qc-cli infra mlflow-url Print a presigned MLflow UI URL
qc-cli infra destroy Destroy stack, retaining S3 data qc-cli infra destroy Destroy stack, retaining S3 data
qc-cli infra destroy --yes Destroy stack without confirmation qc-cli infra destroy --yes Destroy stack without confirmation
qc-cli infra destroy --delete-bucket-data Destroy stack and delete S3 data qc-cli infra destroy --delete-bucket-data Destroy stack and delete S3 data

View File

@@ -72,10 +72,11 @@ if [[ "${SKIP_UPLOAD}" == false ]]; then
run uv run qc-cli upload "${DATASET_DIR}" --config "${CONFIG_PATH}" run uv run qc-cli upload "${DATASET_DIR}" --config "${CONFIG_PATH}"
fi fi
TRAIN_OUTPUT="$(uv run qc-cli train start --config "${CONFIG_PATH}")" TRAIN_OUTPUT_FILE="$(mktemp)"
echo "${TRAIN_OUTPUT}" trap 'rm -f "${TRAIN_OUTPUT_FILE}"' EXIT
run uv run qc-cli train start --config "${CONFIG_PATH}" | tee "${TRAIN_OUTPUT_FILE}"
JOB_NAME="$(printf '%s\n' "${TRAIN_OUTPUT}" | grep -Eo 'qc-cli-[0-9]{8}-[0-9]{6}' | tail -n 1)" JOB_NAME="$(grep -Eo 'qc-cli-[0-9]{8}-[0-9]{6}' "${TRAIN_OUTPUT_FILE}" | tail -n 1)"
if [[ -z "${JOB_NAME}" ]]; then if [[ -z "${JOB_NAME}" ]]; then
echo "Could not find training job name in qc-cli output." >&2 echo "Could not find training job name in qc-cli output." >&2
exit 1 exit 1

View File

@@ -16,6 +16,12 @@ dependencies = [
"pyyaml>=6.0.3", "pyyaml>=6.0.3",
] ]
[project.optional-dependencies]
mlflow = [
"mlflow>=3.0",
"sagemaker-mlflow>=0.4.0",
]
[project.scripts] [project.scripts]
qc-cli = "src.main:app" qc-cli = "src.main:app"
@@ -25,6 +31,7 @@ packages = ["src"]
[dependency-groups] [dependency-groups]
dev = [ dev = [
"boto3-stubs[iam,s3,sagemaker]", "boto3-stubs[iam,s3,sagemaker]",
"pytest>=8.0",
"pyright>=1.1.409", "pyright>=1.1.409",
"types-PyYAML", "types-PyYAML",
"ruff>=0.4", "ruff>=0.4",

View File

@@ -17,3 +17,20 @@ def describe_tracking_server(region: str, profile: str, name: str) -> dict[str,
): ):
return None return None
raise raise
def get_tracking_server_arn(region: str, profile: str, name: str) -> str:
server = describe_tracking_server(region, profile, name)
if not server:
raise ValueError(f"MLflow tracking server not found: {name}")
arn = server.get("TrackingServerArn")
if not arn:
raise ValueError(f"MLflow tracking server has no ARN: {name}")
return str(arn)
def create_presigned_tracking_server_url(region: str, profile: str, name: str) -> str:
client = boto3.Session(profile_name=profile, region_name=region).client("sagemaker")
response = client.create_presigned_mlflow_tracking_server_url(TrackingServerName=name)
return str(response["AuthorizedUrl"])

View File

@@ -36,6 +36,7 @@ class TrainingJobStatus:
modified: datetime | None modified: datetime | None
model_artifacts: str | None model_artifacts: str | None
failure_reason: str | None failure_reason: str | None
raw: dict[str, Any] = field(default_factory=dict)
def _sm(session: Boto3SessionKwargs) -> SageMakerClient: def _sm(session: Boto3SessionKwargs) -> SageMakerClient:
@@ -116,6 +117,7 @@ def get_training_job_status(session: Boto3SessionKwargs, job_name: str) -> Train
modified=resp.get("LastModifiedTime"), modified=resp.get("LastModifiedTime"),
model_artifacts=resp.get("ModelArtifacts", {}).get("S3ModelArtifacts"), model_artifacts=resp.get("ModelArtifacts", {}).get("S3ModelArtifacts"),
failure_reason=resp.get("FailureReason"), failure_reason=resp.get("FailureReason"),
raw=dict(resp),
) )

View File

@@ -77,7 +77,8 @@ def setup(
if outputs.get("SageMakerRoleArn"): if outputs.get("SageMakerRoleArn"):
CONSOLE.print(f"[green]✓[/green] IAM role: {outputs['SageMakerRoleArn']}") CONSOLE.print(f"[green]✓[/green] IAM role: {outputs['SageMakerRoleArn']}")
if cfg.mlflow.mode is MlflowMode.create and outputs.get("MlflowTrackingServerArn"): if cfg.mlflow.mode is MlflowMode.create and outputs.get("MlflowTrackingServerArn"):
CONSOLE.print(f"[green]✓[/green] MLflow: {outputs['MlflowTrackingServerArn']}") mlflow_name = outputs.get("MlflowTrackingServerName", cfg.managed_mlflow_tracking_server_name)
CONSOLE.print(f"[green]✓[/green] MLflow: {mlflow_name}")
elif cfg.mlflow.mode is MlflowMode.existing: elif cfg.mlflow.mode is MlflowMode.existing:
CONSOLE.print(f"[green]✓[/green] MLflow: {cfg.mlflow.tracking_server_name}") CONSOLE.print(f"[green]✓[/green] MLflow: {cfg.mlflow.tracking_server_name}")
CONSOLE.print("\n[bold green]Infrastructure ready.[/bold green]") CONSOLE.print("\n[bold green]Infrastructure ready.[/bold green]")
@@ -102,7 +103,7 @@ def status(config: str = CONFIG_OPT) -> None:
if cfg.mlflow.mode is not MlflowMode.disabled: if cfg.mlflow.mode is not MlflowMode.disabled:
table.add_row( table.add_row(
"MLflow", "MLflow",
cfg.mlflow.tracking_server_name or "-", cfg.effective_mlflow_tracking_server_name or "-",
"[red]unknown[/red]", "[red]unknown[/red]",
"-", "-",
) )
@@ -126,7 +127,7 @@ def status(config: str = CONFIG_OPT) -> None:
if cfg.mlflow.mode is MlflowMode.create: if cfg.mlflow.mode is MlflowMode.create:
table.add_row( table.add_row(
"MLflow", "MLflow",
cfg.mlflow.tracking_server_name or "-", outputs.get("MlflowTrackingServerName", cfg.managed_mlflow_tracking_server_name),
"[green]managed[/green]", "[green]managed[/green]",
outputs.get("MlflowTrackingServerArn", outputs.get("MlflowArtifactUri", "-")), outputs.get("MlflowTrackingServerArn", outputs.get("MlflowArtifactUri", "-")),
) )
@@ -149,6 +150,32 @@ def status(config: str = CONFIG_OPT) -> None:
CONSOLE.print(table) CONSOLE.print(table)
@app.command(name="mlflow-url")
def mlflow_url(config: str = CONFIG_OPT) -> None:
"""Print a presigned URL for the configured MLflow tracking server."""
cfg = load_cfg(config)
tracking_server_name = _mlflow_tracking_server_name(cfg)
try:
url = mlflow.create_presigned_tracking_server_url(
cfg.aws.region,
cfg.aws.profile,
tracking_server_name,
)
except Exception as e:
CONSOLE.print("[yellow]Could not create a SageMaker MLflow UI URL.[/yellow]")
CONSOLE.print(f"Tracking server: [cyan]{tracking_server_name}[/cyan]")
CONSOLE.print(f"Reason: {e}")
CONSOLE.print(
"This command can create presigned URLs only for MLflow tracking servers managed by "
"Amazon SageMaker. If this is an external MLflow server, open it with that server's own URL."
)
raise typer.Exit(1)
CONSOLE.print(f"MLflow tracking server: [cyan]{tracking_server_name}[/cyan]")
CONSOLE.print(f"MLflow UI: {url}")
@app.command() @app.command()
def destroy( def destroy(
config: str = CONFIG_OPT, config: str = CONFIG_OPT,
@@ -209,6 +236,15 @@ def _role_name(configured_name: str, role_arn: str) -> str:
return role_arn.rsplit("/", 1)[-1] return role_arn.rsplit("/", 1)[-1]
return "-" return "-"
def _mlflow_tracking_server_name(cfg: Config) -> str:
name = cfg.effective_mlflow_tracking_server_name
if not name:
CONSOLE.print("[red]MLflow is disabled in config.yaml.[/red]")
raise typer.Exit(1)
return name
def _destroy_account_id(config_path: str, cfg: Config) -> str: def _destroy_account_id(config_path: str, cfg: Config) -> str:
config_dir = str(Path(config_path).parent) config_dir = str(Path(config_path).parent)
state = read_infra_state(config_dir) state = read_infra_state(config_dir)

View File

@@ -8,8 +8,9 @@ from src import state as state_ops
from src.aws import iam from src.aws import iam
from src.aws import sagemaker as sm_ops from src.aws import sagemaker as sm_ops
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
from src.config import Config from src.config import Config, MlflowMode
from src.infra.state import read_infra_state from src.infra.state import read_infra_state
from src.tracking.mlflow import MlflowTracker
app = typer.Typer(help="Manage SageMaker training jobs") app = typer.Typer(help="Manage SageMaker training jobs")
@@ -22,6 +23,14 @@ _STATUS_COLOR = {
} }
def _tracker(cfg):
try:
return MlflowTracker.from_config(cfg)
except Exception as e:
CONSOLE.print(f"[red]MLflow setup failed: {e}[/red]")
raise typer.Exit(1)
def _config_dir(config_path: str) -> str: def _config_dir(config_path: str) -> str:
return str(Path(config_path).parent) return str(Path(config_path).parent)
@@ -58,6 +67,7 @@ def start(config: str = CONFIG_OPT) -> None:
CONSOLE.print(f"[red]{e}[/red]") CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
tracker = _tracker(cfg)
job_name = f"qc-cli-{datetime.now().strftime('%Y%m%d-%H%M%S')}" job_name = f"qc-cli-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
s3_train_uri = f"s3://{cfg.s3.bucket}/{cfg.s3.data_prefix}" s3_train_uri = f"s3://{cfg.s3.bucket}/{cfg.s3.data_prefix}"
s3_output = f"s3://{cfg.s3.bucket}/{cfg.s3.model_prefix}" s3_output = f"s3://{cfg.s3.bucket}/{cfg.s3.model_prefix}"
@@ -77,9 +87,21 @@ def start(config: str = CONFIG_OPT) -> None:
) )
sm_ops.start_training_job(cfg.aws.boto3_session, training_job) sm_ops.start_training_job(cfg.aws.boto3_session, training_job)
state_ops.write_state(_config_dir(config), last_training_job=job_name) st = state_ops.store(config)
st.set_last_training_job(job_name)
run_id = tracker.start_training_run(
training_job,
region=cfg.aws.region,
profile=cfg.aws.profile,
role_arn=role_arn,
)
if run_id:
st.update_training_job(job_name, mlflow_run_id=run_id)
CONSOLE.print(f"[green]✓[/green] Job submitted: [bold]{job_name}[/bold]") CONSOLE.print(f"[green]✓[/green] Job submitted: [bold]{job_name}[/bold]")
if run_id:
CONSOLE.print(f"MLflow run: [cyan]{run_id}[/cyan]")
CONSOLE.print("Open MLflow: [cyan]qc-cli infra mlflow-url[/cyan]")
CONSOLE.print("Track progress: [cyan]qc-cli train status[/cyan]") CONSOLE.print("Track progress: [cyan]qc-cli train status[/cyan]")
@@ -90,9 +112,10 @@ def status(
) -> None: ) -> None:
"""Show training job status.""" """Show training job status."""
cfg = load_cfg(config) cfg = load_cfg(config)
st = state_ops.store(config)
if not job_name: if not job_name:
job_name = state_ops.get_last_training_job(_config_dir(config)) job_name = st.get_last_training_job()
if not job_name: if not job_name:
CONSOLE.print( CONSOLE.print(
"[red]No training job found in state. Pass a job name or run 'qc-cli train start' first.[/red]" "[red]No training job found in state. Pass a job name or run 'qc-cli train start' first.[/red]"
@@ -111,6 +134,25 @@ def status(
if status.failure_reason: if status.failure_reason:
CONSOLE.print(f"[red]Failure: {status.failure_reason}[/red]") CONSOLE.print(f"[red]Failure: {status.failure_reason}[/red]")
job_state = st.get_training_job(job_name)
run_id = job_state.get("mlflow_run_id")
already_registered = job_state.get("registered_model_version")
if run_id and not already_registered and status.status in {"Completed", "Failed", "Stopped"}:
tracker = _tracker(cfg)
version = tracker.finalize_training_run(
run_id=str(run_id),
training_job_status=status,
)
updates = {"mlflow_finalized_status": status.status}
if version:
updates["registered_model_version"] = version
st.update_training_job(job_name, **updates)
if version:
st.set_latest_prerelease_model_version(version)
CONSOLE.print(f"MLflow model version: [cyan]{version}[/cyan] ([cyan]prerelease-latest[/cyan])")
if run_id and cfg.mlflow.mode is not MlflowMode.disabled:
CONSOLE.print("Open MLflow: [cyan]qc-cli infra mlflow-url[/cyan]")
@app.command(name="list") @app.command(name="list")
def list_jobs( def list_jobs(

View File

@@ -1,5 +1,5 @@
import re import re
from enum import Enum from enum import StrEnum
from typing import Any, Literal, TypedDict from typing import Any, Literal, TypedDict
from mypy_boto3_s3.literals import BucketLocationConstraintType from mypy_boto3_s3.literals import BucketLocationConstraintType
@@ -7,13 +7,13 @@ from mypy_boto3_sagemaker.literals import TrainingInstanceTypeType
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, model_validator
class MlflowMode(str, Enum): class MlflowMode(StrEnum):
disabled = "disabled" disabled = "disabled"
create = "create" create = "create"
existing = "existing" existing = "existing"
class MlflowServerSize(str, Enum): class MlflowServerSize(StrEnum):
small = "Small" small = "Small"
medium = "Medium" medium = "Medium"
large = "Large" large = "Large"
@@ -83,6 +83,9 @@ class SageMakerConfig(BaseModel):
class MlflowConfig(BaseModel): class MlflowConfig(BaseModel):
mode: MlflowMode = MlflowMode.disabled mode: MlflowMode = MlflowMode.disabled
tracking_server_name: str | None = None tracking_server_name: str | None = None
experiment_name: str = "qc-cli-training"
registered_model_name: str = "qc-cli-model"
register_trained_models: bool = True
artifact_prefix: str = "mlflow/" artifact_prefix: str = "mlflow/"
tracking_server_size: MlflowServerSize = MlflowServerSize.small tracking_server_size: MlflowServerSize = MlflowServerSize.small
mlflow_version: str | None = None mlflow_version: str | None = None
@@ -91,8 +94,8 @@ class MlflowConfig(BaseModel):
@model_validator(mode="after") @model_validator(mode="after")
def require_tracking_server_name(self) -> "MlflowConfig": def require_tracking_server_name(self) -> "MlflowConfig":
if self.mode in {MlflowMode.create, MlflowMode.existing} and not self.tracking_server_name: if self.mode is MlflowMode.existing and not self.tracking_server_name:
raise ValueError("mlflow.tracking_server_name is required when mlflow.mode is create or existing") raise ValueError("mlflow.tracking_server_name is required when mlflow.mode is existing")
return self return self
@@ -102,3 +105,15 @@ class Config(BaseModel):
s3: S3Config = Field(default_factory=S3Config) s3: S3Config = Field(default_factory=S3Config)
sagemaker: SageMakerConfig = Field(default_factory=SageMakerConfig) sagemaker: SageMakerConfig = Field(default_factory=SageMakerConfig)
mlflow: MlflowConfig = Field(default_factory=MlflowConfig) mlflow: MlflowConfig = Field(default_factory=MlflowConfig)
@property
def managed_mlflow_tracking_server_name(self) -> str:
return f"{self.infra.stack_name}-mlflow"
@property
def effective_mlflow_tracking_server_name(self) -> str | None:
if self.mlflow.mode is MlflowMode.disabled:
return None
if self.mlflow.mode is MlflowMode.existing:
return self.mlflow.tracking_server_name
return self.managed_mlflow_tracking_server_name

View File

@@ -74,6 +74,7 @@ class QCStack(Stack):
CfnOutput(self, "SageMakerRoleArn", value=role.attr_arn) CfnOutput(self, "SageMakerRoleArn", value=role.attr_arn)
if config.mlflow.mode is MlflowMode.create: if config.mlflow.mode is MlflowMode.create:
tracking_server_name = config.managed_mlflow_tracking_server_name
artifact_prefix = config.mlflow.artifact_prefix.strip("/") artifact_prefix = config.mlflow.artifact_prefix.strip("/")
artifact_uri = ( artifact_uri = (
f"s3://{data_bucket.bucket_name}/{artifact_prefix}/" f"s3://{data_bucket.bucket_name}/{artifact_prefix}/"
@@ -145,14 +146,14 @@ class QCStack(Stack):
"MlflowTrackingServer", "MlflowTrackingServer",
artifact_store_uri=artifact_uri, artifact_store_uri=artifact_uri,
role_arn=mlflow_role.attr_arn, role_arn=mlflow_role.attr_arn,
tracking_server_name=config.mlflow.tracking_server_name or "", tracking_server_name=tracking_server_name,
automatic_model_registration=config.mlflow.automatic_model_registration, automatic_model_registration=config.mlflow.automatic_model_registration,
mlflow_version=config.mlflow.mlflow_version, mlflow_version=config.mlflow.mlflow_version,
tracking_server_size=config.mlflow.tracking_server_size.value, tracking_server_size=config.mlflow.tracking_server_size.value,
weekly_maintenance_window_start=config.mlflow.weekly_maintenance_window_start, weekly_maintenance_window_start=config.mlflow.weekly_maintenance_window_start,
) )
CfnOutput(self, "MlflowTrackingServerName", value=config.mlflow.tracking_server_name or "") CfnOutput(self, "MlflowTrackingServerName", value=tracking_server_name)
CfnOutput(self, "MlflowTrackingServerArn", value=tracking_server.attr_tracking_server_arn) CfnOutput(self, "MlflowTrackingServerArn", value=tracking_server.attr_tracking_server_arn)
CfnOutput(self, "MlflowArtifactUri", value=artifact_uri) CfnOutput(self, "MlflowArtifactUri", value=artifact_uri)
CfnOutput(self, "MlflowRoleArn", value=mlflow_role.attr_arn) CfnOutput(self, "MlflowRoleArn", value=mlflow_role.attr_arn)

View File

@@ -1,30 +1,65 @@
import json import json
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
STATE_FILE = ".qc-cli.json" STATE_FILE = ".qc-cli.json"
def _path(config_dir: str) -> Path: @dataclass(frozen=True)
return Path(config_dir) / STATE_FILE class CliStateStore:
config_dir: str = "."
@property
def path(self) -> Path:
return Path(self.config_dir) / STATE_FILE
def read_state(config_dir: str = ".") -> dict[str, Any]: def read(self) -> dict[str, Any]:
path = _path(config_dir) if not self.path.exists():
if not path.exists():
return {} return {}
with open(path) as f: with open(self.path) as f:
return json.load(f) value = json.load(f)
return dict(value) if isinstance(value, dict) else {}
def update(self, **updates: Any) -> None:
def write_state(config_dir: str = ".", **updates: str | None) -> None: state = self.read()
path = _path(config_dir)
state = read_state(config_dir)
state.update(updates) state.update(updates)
with open(path, "w") as f: self._write(state)
def get(self, key: str, default: Any = None) -> Any:
return self.read().get(key, default)
def get_last_training_job(self) -> str | None:
value = self.get("last_training_job")
return str(value) if value else None
def set_last_training_job(self, job_name: str) -> None:
self.update(last_training_job=job_name)
def get_training_job(self, job_name: str) -> dict[str, Any]:
jobs = self._training_jobs(self.read())
value = jobs.get(job_name, {})
return dict(value) if isinstance(value, dict) else {}
def update_training_job(self, job_name: str, **updates: Any) -> None:
state = self.read()
jobs = self._training_jobs(state)
jobs[job_name] = {**jobs.get(job_name, {}), **updates}
state["training_jobs"] = jobs
self._write(state)
def set_latest_prerelease_model_version(self, version: str) -> None:
self.update(latest_prerelease_model_version=version)
def _write(self, state: dict[str, Any]) -> None:
with open(self.path, "w") as f:
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
def _training_jobs(self, state: dict[str, Any]) -> dict[str, Any]:
value = state.get("training_jobs", {})
return dict(value) if isinstance(value, dict) else {}
def get_last_training_job(config_dir: str = ".") -> str | None:
value = read_state(config_dir).get("last_training_job") def store(config_path: str) -> CliStateStore:
return str(value) if value else None config_dir = str(Path(config_path).parent)
return CliStateStore(config_dir)

3
src/tracking/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from src.tracking.mlflow import MlflowTracker, NoopTracker, Tracker
__all__ = ["MlflowTracker", "NoopTracker", "Tracker"]

167
src/tracking/mlflow.py Normal file
View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Any, Protocol
from src.aws import mlflow as aws_mlflow
from src.config import Config, MlflowMode
class Tracker(Protocol):
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: ...
def finalize_training_run(
self,
*,
run_id: str | None,
training_job_status: Any,
) -> str | None: ...
@dataclass(frozen=True)
class NoopTracker:
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
return None
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
return None
@dataclass(frozen=True)
class MlflowTracker:
mlflow: Any
tracking_uri: str
experiment_name: str
registered_model_name: str
register_trained_models: bool
@classmethod
def from_config(cls, cfg: Config) -> Tracker:
if cfg.mlflow.mode is MlflowMode.disabled:
return NoopTracker()
os.environ.setdefault("MLFLOW_SUPPRESS_PRINTING_URL_TO_STDOUT", "true")
try:
import mlflow
except ImportError as e:
raise RuntimeError(
"MLflow is enabled in config but optional dependencies are not installed. "
"Install with: qc-cli[mlflow]"
) from e
tracking_server_name = cfg.effective_mlflow_tracking_server_name
if not tracking_server_name:
raise RuntimeError("MLflow tracking server name could not be resolved.")
tracking_uri = aws_mlflow.get_tracking_server_arn(
cfg.aws.region,
cfg.aws.profile,
tracking_server_name,
)
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(cfg.mlflow.experiment_name)
return cls(
mlflow=mlflow,
tracking_uri=tracking_uri,
experiment_name=cfg.mlflow.experiment_name,
registered_model_name=cfg.mlflow.registered_model_name,
register_trained_models=cfg.mlflow.register_trained_models,
)
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
run = self.mlflow.start_run(run_name=training_job.job_name)
run_id = str(run.info.run_id)
params = {
"aws.region": region,
"aws.profile": profile,
"sagemaker.role_arn": role_arn,
"sagemaker.job_name": training_job.job_name,
"sagemaker.training_image": training_job.image_uri,
"sagemaker.instance_type": training_job.instance_type,
"sagemaker.instance_count": training_job.instance_count,
"sagemaker.s3_train_uri": training_job.s3_train_uri,
"sagemaker.s3_output_path": training_job.s3_output_path,
"sagemaker.entry_point": training_job.entry_point,
"sagemaker.source_dir": training_job.source_dir,
}
self._log_params(params)
self._log_params({f"hyperparameters.{key}": value for key, value in training_job.hyperparameters.items()})
self.mlflow.set_tags(
{
"qc_cli.stage": "prerelease",
"qc_cli.command": "train start",
"sagemaker.job_name": training_job.job_name,
}
)
self.mlflow.end_run()
return run_id
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
if not run_id:
return None
with self.mlflow.start_run(run_id=run_id):
self._log_params(
{
"sagemaker.training_status": training_job_status.status,
"sagemaker.created_at": training_job_status.created,
"sagemaker.modified_at": training_job_status.modified,
"sagemaker.model_artifacts": training_job_status.model_artifacts,
"sagemaker.failure_reason": training_job_status.failure_reason,
}
)
self._log_final_metrics(training_job_status.raw)
self.mlflow.set_tag("qc_cli.command", "train status")
if training_job_status.status != "Completed" or not training_job_status.model_artifacts:
self.mlflow.set_tag("qc_cli.training_terminal_status", training_job_status.status)
return None
if not self.register_trained_models:
return None
client = self.mlflow.tracking.MlflowClient()
self._ensure_registered_model(client, self.registered_model_name)
version = client.create_model_version(
name=self.registered_model_name,
source=training_job_status.model_artifacts,
run_id=run_id,
tags={
"qc_cli.stage": "prerelease",
"sagemaker.job_name": training_job_status.name,
},
)
version_number = str(version.version)
self._set_alias(client, self.registered_model_name, "prerelease-latest", version_number)
self.mlflow.set_tag("qc_cli.registered_model_name", self.registered_model_name)
self.mlflow.set_tag("qc_cli.registered_model_version", version_number)
return version_number
def _log_params(self, params: dict[str, Any]) -> None:
cleaned = {key: str(value) for key, value in params.items() if value is not None}
if cleaned:
self.mlflow.log_params(cleaned)
def _log_final_metrics(self, training_job: dict[str, Any]) -> None:
metrics = {}
for metric in training_job.get("FinalMetricDataList", []):
name = metric.get("MetricName")
value = metric.get("Value")
if name and value is not None:
metrics[str(name)] = float(value)
if metrics:
self.mlflow.log_metrics(metrics)
def _ensure_registered_model(self, client: Any, name: str) -> None:
try:
client.get_registered_model(name)
except Exception:
client.create_registered_model(name)
def _set_alias(self, client: Any, name: str, alias: str, version: str) -> None:
if hasattr(client, "set_registered_model_alias"):
client.set_registered_model_alias(name, alias, version)

1896
uv.lock generated

File diff suppressed because it is too large Load Diff