17 Commits

Author SHA1 Message Date
f26e8256f0 update ai-hub to first optimize model for Workbench
Remove old examples
2026-06-09 14:55:26 -04:00
6c9f30d290 clean 2026-06-09 12:21:25 -04:00
c6d4da66ae remove 2026-06-09 12:19:17 -04:00
9dc6f478bd add ONNX sanitize required for Ultralytics 2026-06-09 12:18:41 -04:00
samirodr
c2d3f44498 add 2026-06-09 12:06:49 -04:00
46cf2d5afe include steps for ai-hub 2026-06-09 11:55:03 -04:00
98b4d0d200 another one 2026-06-09 10:14:49 -04:00
f1f5dcbed7 update 2026-06-09 10:01:09 -04:00
75f66f81c1 initial version to train yolo model 2026-06-09 09:15:35 -04:00
samirodr
5360a482fc update 2026-06-08 14:59:44 -04:00
samirodr
6a560a8610 match 2026-06-08 14:54:13 -04:00
d244150d98 move mlflow to its own command 2026-06-05 11:47:38 -04:00
d7c7158464 clean main file 2026-06-05 11:25:04 -04:00
6bc25dc183 restructure config to use Device class directly
Also include device validation
2026-06-04 17:28:17 -04:00
samirodr
71a95aa3a7 update description 2026-06-03 17:13:00 -04:00
a3f3060e13 ai-hub (#3)
Reviewed-on: #3
2026-06-03 21:06:06 +00:00
e9ada2612f Mlflow implementation (#2)
Reviewed-on: #2
2026-06-02 19:04:23 +00:00
32 changed files with 3901 additions and 575 deletions

108
README.md
View File

@@ -65,6 +65,18 @@ sagemaker:
entry_point: null # Optional: script inside source_dir entry_point: null # Optional: script inside source_dir
source_dir: null # Optional: local dir packaged and uploaded automatically source_dir: null # Optional: local dir packaged and uploaded automatically
hyperparameters: {} hyperparameters: {}
aihub:
device:
name: Samsung Galaxy S25 (Family)
target_runtime: tflite
input_specs: {} # Required before running qc-cli ai-hub commands
job_name: null # Optional prefix for AI Hub Workbench jobs
model_name: null # Optional name for uploaded local ONNX models
compile_options: null
profile_options: null
quantize_options: null
output_dir: build/qai-hub
``` ```
`qc-cli init` generates the `infra.stack_name` and `s3.bucket` namespace once and writes it to `config.yaml`. Keep these values stable for a deployment; changing them points the CLI at different infrastructure. `qc-cli init` generates the `infra.stack_name` and `s3.bucket` namespace once and writes it to `config.yaml`. Keep these values stable for a deployment; changing them points the CLI at different infrastructure.
@@ -78,9 +90,13 @@ To provision an MLflow tracking server, set:
```yaml ```yaml
mlflow: mlflow:
mode: create mode: create
tracking_server_name: your-tracking-server-name experiment_name: qc-cli-training
registered_model_name: qc-cli-model
register_trained_models: true
``` ```
In `create` mode, the CLI manages the tracking server name from `infra.stack_name`; you do not need to set `tracking_server_name`.
To use an existing MLflow tracking server, set: To use an existing MLflow tracking server, set:
```yaml ```yaml
@@ -89,6 +105,16 @@ mlflow:
tracking_server_name: your-tracking-server-name tracking_server_name: your-tracking-server-name
``` ```
When MLflow is enabled, `train start` creates an MLflow run for the SageMaker job. `train status` finalizes that run once the job reaches a terminal state and registers completed model artifacts as experiment model versions using the `experiment-latest` MLflow alias. An experiment version is an immutable trained-source artifact; it records that training produced a model, not that the model is better than earlier versions or ready for release.
To open the managed SageMaker MLflow UI, request a fresh presigned URL:
```bash
qc-cli mlflow open --config config.yaml
```
This opens a browser to a fresh presigned URL. It works for `mode: create` and for `mode: existing` when the existing server is managed by Amazon SageMaker. In `create` mode, the command uses the CLI-managed tracking server name. In `existing` mode, it uses `mlflow.tracking_server_name`. If the existing MLflow server is external to SageMaker, open it with that server's own URL instead.
## Commands ## Commands
### `init` ### `init`
@@ -99,6 +125,12 @@ qc-cli init --output <path> Write config to a custom path
qc-cli init --force Overwrite an existing config file qc-cli init --force Overwrite an existing config file
``` ```
### `mlflow`
```
qc-cli mlflow open Open a presigned MLflow UI URL in a browser
```
### `infra` ### `infra`
``` ```
@@ -140,6 +172,80 @@ qc-cli train list --limit 3 Show a custom number of recent jobs
The expected output artifact is SageMakers `model.tar.gz`, normally containing the trained model file your container writes to `/opt/ml/model`. The expected output artifact is SageMakers `model.tar.gz`, normally containing the trained model file your container writes to `/opt/ml/model`.
### `ai-hub`
```
qc-cli ai-hub upload <calibration.npz|calibration-dir> <inputs.npz|inputs.npy>
qc-cli ai-hub upload <calibration> <inputs> --from-step validate
qc-cli ai-hub optimize [--onnx-path PATH] [--model-s3-uri URI] [--from-job NAME]
qc-cli ai-hub quantize <calibration.npz|calibration-dir> [--model-id ID] [--onnx-path PATH] [--model-s3-uri URI] [--from-job NAME]
qc-cli ai-hub compile [--model-id ID] [--onnx-path PATH] [--model-s3-uri URI] [--from-job NAME]
qc-cli ai-hub validate <inputs.npz|inputs.npy> [--model-id ID] [--input-name NAME]
qc-cli ai-hub profile [--model-id ID]
qc-cli ai-hub download [--model-id ID] [--output PATH]
```
`ai-hub upload` optimizes to ONNX, quantizes, validates, and profiles. When `aihub.target_runtime` is not `onnx`, it
also compiles the quantized model to that deployment runtime. The initial ONNX optimization gives external models
Workbench provenance and applies compiler optimization passes before quantization.
Resume behavior:
```text
--from-step optimize Run optimize, quantize, optional final compile, validate, and profile.
--from-step quantize Quantize the last optimized ONNX, then optionally compile, validate, and profile.
--from-step compile Skip optimize and quantize; finalize the last quantized model for the target runtime.
--from-step validate Skip optimize, quantize, and compile; validate the last compiled model.
--from-step profile Skip optimize, quantize, compile, and validate; profile the last compiled model.
```
When a step runs in the current command, `upload` passes its returned model ID directly to the next step. When a step is skipped, the next step resolves the needed model ID from `.qc-cli.json`. This avoids re-running earlier AI Hub jobs when you only need to continue from a later step.
`ai-hub optimize` compiles an external model with `--target_runtime onnx`. `ai-hub quantize` uses an explicit
`--model-id`, the last optimized ONNX model, or an explicit/local model source in that order. `ai-hub compile` resolves
model sources in this order: `--model-id`, explicit source options, last quantized model, then the last training job.
For `target_runtime: onnx`, upload treats the quantized ONNX as the final model and skips a redundant second compile.
`ai-hub download` remains separate because downloading is outside the Workbench processing loop.
AI Hub authentication currently uses the local `qai-hub` SDK configuration. A planned follow-up is to support AWS Systems Manager Parameter Store `SecureString` for team-managed tokens, where `config.yaml` stores only a parameter name such as `/qc-cli/aihub/token`, AWS KMS encrypts the token at rest, and the CLI retrieves it at runtime with `ssm:GetParameter` plus `kms:Decrypt` permissions.
## Model lifecycle
The CLI uses neutral experiment naming for trained artifacts and reserves release terminology for an explicit promotion step.
Current behavior:
1. `qc-cli train start` submits a SageMaker training job.
2. `qc-cli train status` finalizes the MLflow run after the job reaches a terminal state.
3. If the job completed and `mlflow.register_trained_models` is enabled, the SageMaker `model.tar.gz` is registered as a new MLflow model version with:
- `qc_cli.stage=experiment`
- `qc_cli.artifact_kind=trained_source`
- `qc_cli.source=sagemaker`
4. The MLflow alias `experiment-latest` points at the most recently registered experiment version.
5. AI Hub upload commands create deployable derived artifacts from a trained-source experiment or local ONNX model.
Future release aliases such as `v1` or `production` can point at a selected deployable artifact.
Example future metadata:
```text
qc-cli-model version 12
qc_cli.stage=experiment
qc_cli.artifact_kind=trained_source
qc_cli.source=sagemaker
qc-cli-model-aihub version 3
qc_cli.stage=ai_hub_compiled
qc_cli.artifact_kind=deployable
qc_cli.parent_registered_model_name=qc-cli-model
qc_cli.parent_model_version=12
qc_cli.runtime=tflite
qc_cli.quantization=int8
qc_cli.target_device=Samsung Galaxy S25
```
In that flow, `experiment-latest` remains a training convenience alias. Release selection is a separate promotion decision based on the derived artifact, not on the experiment name.
## AWS permissions required ## AWS permissions required
The IAM user or role running the CLI needs: The IAM user or role running the CLI needs:

View File

@@ -0,0 +1,266 @@
# YOLO26 Electric Meter Detection Example
This example trains a YOLO26 object detection model on the Roboflow Universe electric meter dataset using the existing `qc-cli` SageMaker training flow.
The workflow is intentionally command driven. Run each step yourself so you can inspect the dataset, update `config.yaml`, and decide when to submit the SageMaker job.
Dataset:
```text
https://universe.roboflow.com/kemals-workspace-kbc8l/electric-meter-detection-o4tfi/dataset/1
```
## Prerequisites
- Install or sync the project dependencies: `uv sync`
- The virtual environment is activated.
- AWS credentials configured for the profile in `config.yaml`
- Infrastructure already deployed with `qc-cli infra setup`
## 1. Download The Dataset
Register or sign in to Roboflow, then open the dataset page:
```text
https://universe.roboflow.com/kemals-workspace-kbc8l/electric-meter-detection-o4tfi/dataset/1
```
Download the dataset in YOLOv26 format from the Roboflow UI, then extract the downloaded archive into:
```text
examples/meter-detection/data/electric-meter-detection
```
The `data.yaml` file should be directly under that folder:
```text
examples/meter-detection/data/electric-meter-detection/data.yaml
```
Do not move `data.yaml` into the `train/` split folder.
After extracting, confirm the dataset has a YOLO data file and image splits:
```bash
find examples/meter-detection/data/electric-meter-detection -maxdepth 2 -type d | sort
find examples/meter-detection/data/electric-meter-detection -name data.yaml -print
```
Open `examples/meter-detection/data/electric-meter-detection/data.yaml` and make sure the split paths are relative to that folder:
```yaml
path: .
train: train/images
val: valid/images
test: test/images
```
If your downloaded dataset does not include a `test/` folder, remove the `test:` line.
The expected layout is similar to:
```text
examples/meter-detection/data/electric-meter-detection/
data.yaml
train/
valid/
test/
```
## 2. Configure SageMaker Training
Update `config.yaml` so the training section points at this example's source directory:
```yaml
sagemaker:
training:
image_uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6-cpu-py312-ubuntu22.04-sagemaker-v1
instance_type: ml.g4dn.xlarge
instance_count: 1
source_dir: examples/meter-detection/source
entry_point: train.py
hyperparameters:
model: yolo26n.pt
epochs: 25
imgsz: 640
batch: 16
workers: 2
```
Use `yolo26n.pt` for a lightweight first YOLO26 run. If those weights are unavailable in the installed Ultralytics package, use `yolo11n.pt` as the established fallback:
```yaml
model: yolo11n.pt
```
The `source/requirements.txt` file is installed by the SageMaker PyTorch container before running `train.py`.
For a CPU smoke test, use a CPU instance and reduce the workload:
```yaml
sagemaker:
training:
image_uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6-cpu-py312-ubuntu22.04-sagemaker-v1
instance_type: ml.m4.xlarge
instance_count: 1
source_dir: examples/meter-detection/source
entry_point: train.py
hyperparameters:
model: yolo26n.pt
epochs: 1
imgsz: 320
batch: 4
workers: 2
```
## 3. Check Infrastructure
Confirm the CLI can see the configured SageMaker role and S3 bucket:
```bash
qc-cli infra status
```
## 4. Upload The Dataset
Upload the downloaded Roboflow dataset to the `s3.data_prefix` configured in `config.yaml`:
```bash
qc-cli upload examples/meter-detection/data/electric-meter-detection
```
Directory uploads preserve paths relative to the uploaded directory, so SageMaker receives the dataset root with `data.yaml` plus the split directories.
In SageMaker, this uploaded dataset root is mounted at `/opt/ml/input/data/train`. That `train` path is the SageMaker channel name, not the YOLO `train/` split folder.
## 5. Start Training
Submit the SageMaker training job:
```bash
qc-cli train start
```
The command prints the submitted SageMaker job name. Check progress with:
```bash
qc-cli train status
```
Or pass the job name explicitly:
```bash
qc-cli train status qc-cli-YYYYMMDD-HHMMSS
```
## SageMaker Outputs
When the job completes, SageMaker packages the files written under `/opt/ml/model` into `model.tar.gz`.
This example writes:
```text
best.pt
model.onnx
metrics.json
```
The archive is stored under the configured `s3.model_prefix`.
## 6. Configure Qualcomm AI Hub
Authenticate with Qualcomm AI Hub:
```bash
qai-hub configure --api_token
```
Add AI Hub settings to `config.yaml`. The input name and image size must match the ONNX model exported by this example:
```yaml
aihub:
device:
name: Dragonwing IQ-9075 EVK
target_runtime: onnx
input_specs:
images: [[1, 3, 640, 640], float32]
job_name: meter-detection
model_name: meter-detection
output_dir: build/qai-hub/meter-detection
```
The ONNX graph is the source of truth. The export normally uses the same value as `sagemaker.training.hyperparameters.imgsz`, but changing `config.yaml` after training does not resize an existing model. For example, a model exported with `imgsz: 320` requires `images: [[1, 3, 320, 320], float32]`.
## 7. Prepare AI Hub Inputs
Generate calibration samples and a validation input from the downloaded dataset:
```bash
uv run python examples/meter-detection/prepare_aihub_inputs.py --image-size 640
```
This writes:
```text
examples/meter-detection/data/aihub_calibration/*.npy
examples/meter-detection/data/inputs.npz
```
The script applies the preprocessing expected by the exported YOLO model: aspect-ratio-preserving letterboxing, RGB channel order, channel-first layout, and pixel values normalized to `[0, 1]`.
## 8. Upload To Qualcomm AI Hub
Use the SageMaker job name printed by `qc-cli train start`:
```bash
qc-cli ai-hub upload \
examples/meter-detection/data/aihub_calibration \
examples/meter-detection/data/inputs.npz \
--from-job qc-cli-YYYYMMDD-HHMMSS
```
The command downloads the job's `model.tar.gz`, finds `model.onnx`, and runs the following AI Hub workflow:
1. Compile the external ONNX to a Workbench-optimized ONNX model.
2. Quantize the optimized ONNX model.
3. Compile the quantized model when the configured deployment runtime is not `onnx`.
4. Validate and profile the final model.
The training example sanitizes the Ultralytics ONNX export before saving `model.onnx`. This removes graph input or output names, such as `output0`, that are duplicated in the ONNX `value_info` metadata and rejected by AI Hub.
For a model already downloaded by a failed upload attempt, sanitize the extracted ONNX file and retry using the local model. Replace the job name in both paths:
```bash
uv run --with onnx python examples/meter-detection/source/sanitize_onnx.py \
build/qai-hub/meter-detection/qc-cli-YYYYMMDD-HHMMSS/source/extracted/model.onnx \
--output build/qai-hub/meter-detection/model.aihub.onnx
qc-cli ai-hub upload \
examples/meter-detection/data/aihub_calibration \
examples/meter-detection/data/inputs.npz \
--onnx-path build/qai-hub/meter-detection/model.aihub.onnx
```
Download the compiled artifact after the workflow completes:
```bash
qc-cli ai-hub download --output build/qai-hub/meter-detection/model.tflite
```
## Training Hyperparameters
Values under `sagemaker.training.hyperparameters` are passed to `source/train.py` as command-line arguments.
| Name | Type | Default | Description |
|---|---:|---:|---|
| `model` | string | `yolo26n.pt` | Ultralytics model weights or model YAML. |
| `epochs` | int | `25` | Number of training epochs. |
| `imgsz` | int | `640` | Square training image size. |
| `batch` | int | `16` | Images per training batch. |
| `workers` | int | `2` | DataLoader worker count. |
| `patience` | int | `20` | Early stopping patience. |
| `device` | string | auto | Optional Ultralytics device value such as `0` or `cpu`. |
| `data-yaml` | string | auto | Optional path to `data.yaml`; normally discovered from the uploaded dataset root. |
| `dataset-dir` | string | `SM_CHANNEL_TRAIN` | Uploaded dataset root mounted by SageMaker. |
Do not set `dataset-dir` or `model-dir` in normal SageMaker runs. SageMaker sets those automatically through `SM_CHANNEL_TRAIN` and `SM_MODEL_DIR`.

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""Prepare Qualcomm AI Hub calibration and validation inputs for the meter detector."""
from __future__ import annotations
import argparse
from pathlib import Path
import numpy as np
from PIL import Image
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png"}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--dataset-dir",
type=Path,
default=Path("examples/meter-detection/data/electric-meter-detection"),
help="Root of the extracted Roboflow dataset.",
)
parser.add_argument(
"--calibration-dir",
type=Path,
default=Path("examples/meter-detection/data/aihub_calibration"),
help="Directory where .npy calibration samples will be written.",
)
parser.add_argument(
"--input-file",
type=Path,
default=Path("examples/meter-detection/data/inputs.npz"),
help="Validation .npz input file for qc-cli ai-hub validate.",
)
parser.add_argument("--input-name", default="images", help="ONNX input name.")
parser.add_argument("--image-size", type=int, default=640, help="Square image size used for ONNX export.")
parser.add_argument("--samples", type=int, default=16, help="Number of calibration samples to write.")
return parser.parse_args()
def preprocess_image(path: Path, image_size: int) -> np.ndarray:
"""Apply Ultralytics-style letterboxing and produce an NCHW float32 tensor."""
with Image.open(path) as source:
image = source.convert("RGB")
scale = min(image_size / image.width, image_size / image.height)
resized_width = round(image.width * scale)
resized_height = round(image.height * scale)
image = image.resize((resized_width, resized_height), Image.Resampling.BILINEAR)
canvas = Image.new("RGB", (image_size, image_size), (114, 114, 114))
left = round((image_size - resized_width) / 2 - 0.1)
top = round((image_size - resized_height) / 2 - 0.1)
canvas.paste(image, (left, top))
array = np.asarray(canvas, dtype=np.float32) / 255.0
return np.transpose(array, (2, 0, 1))[None, ...].astype(np.float32)
def main() -> None:
args = parse_args()
if args.image_size < 1:
raise SystemExit("--image-size must be at least 1")
if args.samples < 1:
raise SystemExit("--samples must be at least 1")
images = sorted(
path
for path in args.dataset_dir.rglob("*")
if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS and path.parent.name == "images"
)
if not images:
raise SystemExit(f"No images found under {args.dataset_dir}")
args.calibration_dir.mkdir(parents=True, exist_ok=True)
args.input_file.parent.mkdir(parents=True, exist_ok=True)
for stale_sample in args.calibration_dir.glob("sample_*.npy"):
stale_sample.unlink()
prepared: list[np.ndarray] = []
for index, image_path in enumerate(images[: args.samples]):
sample = preprocess_image(image_path, args.image_size)
np.save(args.calibration_dir / f"sample_{index:03d}.npy", sample)
prepared.append(sample)
np.savez(args.input_file, **{args.input_name: prepared[0]}) # pyright: ignore[reportArgumentType]
print(f"Wrote {len(prepared)} calibration samples to {args.calibration_dir}")
print(f"Wrote validation input to {args.input_file}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
ultralytics>=8.3.0
pyyaml>=6.0.3
onnx>=1.16.0

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""Remove ONNX value_info entries that duplicate graph inputs or outputs."""
from __future__ import annotations
import argparse
from pathlib import Path
import onnx # type: ignore[reportMissingImports]
def sanitize_onnx(path: Path, output_path: Path | None = None) -> Path:
model = onnx.load(path)
io_names = {value.name for value in (*model.graph.input, *model.graph.output)}
retained_value_info = [value for value in model.graph.value_info if value.name not in io_names]
destination = output_path or path
if len(retained_value_info) != len(model.graph.value_info):
del model.graph.value_info[:]
model.graph.value_info.extend(retained_value_info)
destination.parent.mkdir(parents=True, exist_ok=True)
onnx.save(model, destination)
return destination
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("onnx_path", type=Path)
parser.add_argument("--output", type=Path)
args = parser.parse_args()
written = sanitize_onnx(args.onnx_path, args.output)
print(f"Saved sanitized ONNX model to {written}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""SageMaker entry point for YOLO electric meter detection training."""
from __future__ import annotations
import argparse
import json
import os
import shutil
from pathlib import Path
from typing import Any
import yaml
from sanitize_onnx import sanitize_onnx
from ultralytics import YOLO # type: ignore[reportMissingImports]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="yolo26n.pt")
parser.add_argument("--epochs", type=int, default=25)
parser.add_argument("--imgsz", type=int, default=640)
parser.add_argument("--batch", type=int, default=16)
parser.add_argument("--workers", type=int, default=2)
parser.add_argument("--patience", type=int, default=20)
parser.add_argument("--device", default=None)
parser.add_argument("--data-yaml", default=None)
parser.add_argument("--dataset-dir", default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
parser.add_argument("--train-dir", dest="dataset_dir", help=argparse.SUPPRESS)
parser.add_argument("--model-dir", default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
return parser.parse_args()
def find_data_yaml(dataset_dir: Path, explicit_path: str | None) -> Path:
if explicit_path:
data_yaml = Path(explicit_path)
if data_yaml.is_file():
return data_yaml
raise FileNotFoundError(f"Configured data.yaml does not exist: {data_yaml}")
matches = sorted(dataset_dir.rglob("data.yaml"))
if not matches:
raise FileNotFoundError(f"Could not find data.yaml under {dataset_dir}")
if len(matches) > 1:
print(f"Found multiple data.yaml files; using {matches[0]}")
return matches[0]
def prepare_data_yaml(data_yaml: Path) -> Path:
"""Write a SageMaker-local data file rooted at the uploaded dataset."""
dataset_root = data_yaml.parent
data = yaml.safe_load(data_yaml.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"Expected a mapping in {data_yaml}")
normalized = dict(data)
normalized["path"] = str(dataset_root)
if "val" not in normalized and "valid" in normalized:
normalized["val"] = normalized.pop("valid")
prepared_path = dataset_root / "data.sagemaker.yaml"
prepared_path.write_text(yaml.safe_dump(normalized, sort_keys=False), encoding="utf-8")
print(f"Prepared dataset config: {prepared_path}")
return prepared_path
def copy_if_exists(source: Path, destination: Path) -> None:
if source.exists():
shutil.copy2(source, destination)
print(f"Saved {destination}")
def main() -> None:
args = parse_args()
dataset_dir = Path(args.dataset_dir)
model_dir = Path(args.model_dir)
model_dir.mkdir(parents=True, exist_ok=True)
data_yaml = prepare_data_yaml(find_data_yaml(dataset_dir, args.data_yaml))
model = YOLO(args.model)
train_kwargs: dict[str, Any] = {
"data": str(data_yaml),
"epochs": args.epochs,
"imgsz": args.imgsz,
"batch": args.batch,
"workers": args.workers,
"patience": args.patience,
"project": str(model_dir / "runs"),
"name": "train",
"exist_ok": True,
}
if args.device:
train_kwargs["device"] = args.device
results = model.train(**train_kwargs)
save_dir = Path(results.save_dir)
best_pt = save_dir / "weights" / "best.pt"
last_pt = save_dir / "weights" / "last.pt"
trained_weights = best_pt if best_pt.exists() else last_pt
if not trained_weights.exists():
raise FileNotFoundError(f"Could not find trained weights in {save_dir / 'weights'}")
copy_if_exists(trained_weights, model_dir / "best.pt")
trained_model = YOLO(str(trained_weights))
onnx_path = Path(trained_model.export(format="onnx", imgsz=args.imgsz))
saved_onnx_path = sanitize_onnx(onnx_path, model_dir / "model.onnx")
print(f"Saved {saved_onnx_path}")
metrics = {
"model": args.model,
"epochs": args.epochs,
"imgsz": args.imgsz,
"batch": args.batch,
"workers": args.workers,
"patience": args.patience,
"data_yaml": str(data_yaml),
"weights": str(trained_weights),
"onnx": str(saved_onnx_path),
}
(model_dir / "metrics.json").write_text(json.dumps(metrics, indent=2), encoding="utf-8")
print(f"Saved model artifacts to {model_dir}")
if __name__ == "__main__":
main()

View File

@@ -1,89 +0,0 @@
# SageMaker Training Example
This example downloads a small image-classification dataset, uploads it through `qc-cli`, and submits a live SageMaker training job.
## Prerequisites
- AWS credentials configured for the profile in `config.yaml`
- Infrastructure already deployed with `qc-cli infra setup`
- `config.yaml` updated with:
```yaml
s3:
bucket: your-bucket-name
sagemaker:
training:
image_uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6-cpu-py312-ubuntu22.04-sagemaker-v1
instance_type: ml.m4.xlarge
instance_count: 1
source_dir: examples/training/source
entry_point: train.py
hyperparameters:
epochs: 1
batch-size: 32
learning-rate: 0.001
image-size: 160
validation-split: 0.2
```
## Training Hyperparameters
Values under `sagemaker.training.hyperparameters` are passed to the training entry point as command-line arguments. For this example, they map to arguments defined in [source/train.py](source/train.py).
Supported by this example:
| Name | Type | Default | Description |
|---|---:|---:|---|
| `epochs` | int | `1` | Number of training epochs. |
| `batch-size` | int | `32` | Images per training batch. |
| `learning-rate` | float | `0.001` | Adam optimizer learning rate. |
| `image-size` | int | `160` | Resize images to square `image-size x image-size`. |
| `validation-split` | float | `0.2` | Fraction of data used for validation. |
| `max-samples` | int | `0` | Optional cap for smoke tests; `0` means use all images. |
| `seed` | int | `13` | Random seed for reproducible splitting. |
| `num-workers` | int | `2` | DataLoader worker count. |
Do not set `train-dir` or `model-dir` in normal SageMaker runs. SageMaker sets those automatically through `SM_CHANNEL_TRAIN` and `SM_MODEL_DIR`.
## 1. Download The Dataset
```bash
bash examples/training/download_flower_photos.sh
```
This creates:
```text
examples/training/data/flower_photos_sagemaker/
daisy/
dandelion/
roses/
sunflowers/
tulips/
```
## 2. Run Training
Run the training script and wait until it finishes:
```bash
bash examples/training/run_training.sh --config config.yaml --wait
```
Use a dataset that is already uploaded to `s3.data_prefix`:
```bash
bash examples/training/run_training.sh \
--config config.yaml \
--skip-upload \
--wait
```
## Notes
- The default dataset path is `examples/training/data/flower_photos_sagemaker`.
- Uploaded data uses the `s3.bucket` and `s3.data_prefix` values from `config.yaml`.
- Training artifacts are written under `s3://<bucket>/<model_prefix>/`.
- The SageMaker `model.tar.gz` contains `model.onnx`, `model.pt`, `class_to_idx.json`, and `metrics.json`.
- SageMaker packages `examples/training/source`, installs `requirements.txt`, and runs `train.py`.

View File

@@ -1,40 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
DATASET_URL="https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
DEST_DIR="${1:-examples/training/data}"
ARCHIVE_PATH="${DEST_DIR}/flower_photos.tgz"
RAW_DATASET_DIR="${DEST_DIR}/flower_photos"
DATASET_DIR="${DEST_DIR}/flower_photos_sagemaker"
CLASS_NAMES=("daisy" "dandelion" "roses" "sunflowers" "tulips")
mkdir -p "${DEST_DIR}"
if [[ -d "${DATASET_DIR}" ]]; then
echo "Dataset already exists: ${DATASET_DIR}"
echo "Use this path with run_training.py:"
echo " ${DATASET_DIR}"
exit 0
fi
echo "Downloading TensorFlow flower_photos dataset..."
if command -v curl >/dev/null 2>&1; then
curl -L "${DATASET_URL}" -o "${ARCHIVE_PATH}"
elif command -v wget >/dev/null 2>&1; then
wget -O "${ARCHIVE_PATH}" "${DATASET_URL}"
else
echo "Either curl or wget is required." >&2
exit 1
fi
echo "Extracting dataset..."
tar -xzf "${ARCHIVE_PATH}" -C "${DEST_DIR}"
echo "Preparing SageMaker directory layout..."
mkdir -p "${DATASET_DIR}"
for class_name in "${CLASS_NAMES[@]}"; do
cp -R "${RAW_DATASET_DIR}/${class_name}" "${DATASET_DIR}/${class_name}"
done
echo "Dataset ready: ${DATASET_DIR}"
find "${DATASET_DIR}" -mindepth 1 -maxdepth 1 -type d -print | sort

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
CONFIG_PATH="config.yaml"
DATASET_DIR="examples/training/data/flower_photos_sagemaker"
WAIT=false
SKIP_UPLOAD=false
POLL_SECONDS=60
usage() {
cat <<EOF
Usage: $0 [options]
Options:
--config PATH Path to qc-cli config file. Default: config.yaml
--dataset-dir PATH Dataset directory to upload. Default: ${DATASET_DIR}
--skip-upload Train against data already uploaded to s3.data_prefix.
--wait Poll until training completes.
-h, --help Show this help.
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--config)
CONFIG_PATH="$2"
shift 2
;;
--dataset-dir)
DATASET_DIR="$2"
shift 2
;;
--skip-upload)
SKIP_UPLOAD=true
shift
;;
--wait)
WAIT=true
shift
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage >&2
exit 1
;;
esac
done
if [[ ! -f "${CONFIG_PATH}" ]]; then
echo "Config not found: ${CONFIG_PATH}" >&2
exit 1
fi
if [[ "${SKIP_UPLOAD}" == false && ! -d "${DATASET_DIR}" ]]; then
echo "Dataset not found: ${DATASET_DIR}" >&2
echo "Run: bash examples/training/download_flower_photos.sh" >&2
exit 1
fi
run() {
echo "+ $*"
"$@"
}
run uv run qc-cli infra status --config "${CONFIG_PATH}"
if [[ "${SKIP_UPLOAD}" == false ]]; then
run uv run qc-cli upload "${DATASET_DIR}" --config "${CONFIG_PATH}"
fi
TRAIN_OUTPUT="$(uv run qc-cli train start --config "${CONFIG_PATH}")"
echo "${TRAIN_OUTPUT}"
JOB_NAME="$(printf '%s\n' "${TRAIN_OUTPUT}" | grep -Eo 'qc-cli-[0-9]{8}-[0-9]{6}' | tail -n 1)"
if [[ -z "${JOB_NAME}" ]]; then
echo "Could not find training job name in qc-cli output." >&2
exit 1
fi
echo "Submitted SageMaker training job: ${JOB_NAME}"
if [[ "${WAIT}" == false ]]; then
run uv run qc-cli train status "${JOB_NAME}" --config "${CONFIG_PATH}"
exit 0
fi
while true; do
STATUS_OUTPUT="$(uv run qc-cli train status "${JOB_NAME}" --config "${CONFIG_PATH}")"
echo "${STATUS_OUTPUT}"
if printf '%s\n' "${STATUS_OUTPUT}" | grep -q 'Status:.*Completed'; then
echo "Training completed successfully."
exit 0
fi
if printf '%s\n' "${STATUS_OUTPUT}" | grep -q 'Status:.*Failed'; then
echo "Training failed." >&2
exit 1
fi
if printf '%s\n' "${STATUS_OUTPUT}" | grep -q 'Status:.*Stopped'; then
echo "Training stopped." >&2
exit 1
fi
sleep "${POLL_SECONDS}"
done

View File

@@ -1 +0,0 @@
onnx==1.21.0

View File

@@ -1,192 +0,0 @@
#!/usr/bin/env python3
"""SageMaker entry point for CPU image-classification training."""
from __future__ import annotations
import argparse
import json
import os
import random
from pathlib import Path
import torch
from torch import nn
from torch.utils.data import DataLoader, Subset, random_split
from torchvision import datasets, transforms
class SmallImageClassifier(nn.Module):
def __init__(self, class_count: int) -> None:
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.AdaptiveAvgPool2d((1, 1)),
)
self.classifier = nn.Linear(64, class_count)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.features(x)
x = torch.flatten(x, 1)
return self.classifier(x)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--epochs", type=int, default=1)
parser.add_argument("--batch-size", type=int, default=32)
parser.add_argument("--learning-rate", type=float, default=0.001)
parser.add_argument("--image-size", type=int, default=160)
parser.add_argument("--validation-split", type=float, default=0.2)
parser.add_argument("--max-samples", type=int, default=0)
parser.add_argument("--seed", type=int, default=13)
parser.add_argument("--num-workers", type=int, default=2)
parser.add_argument("--train-dir", default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
parser.add_argument("--model-dir", default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
return parser.parse_args()
def build_datasets(args: argparse.Namespace) -> tuple[Subset, Subset, dict[str, int]]:
transform = transforms.Compose(
[
transforms.Resize((args.image_size, args.image_size)),
transforms.ToTensor(),
transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
]
)
dataset = datasets.ImageFolder(args.train_dir, transform=transform)
if len(dataset.classes) < 2:
raise ValueError(f"Expected at least two classes in {args.train_dir}. Found: {dataset.classes}")
if args.max_samples > 0 and args.max_samples < len(dataset):
indices = list(range(len(dataset)))
random.Random(args.seed).shuffle(indices)
dataset = Subset(dataset, indices[: args.max_samples])
validation_size = max(1, int(len(dataset) * args.validation_split))
train_size = len(dataset) - validation_size
if train_size < 1:
raise ValueError("Not enough images to create a train/validation split.")
generator = torch.Generator().manual_seed(args.seed)
train_dataset, validation_dataset = random_split(dataset, [train_size, validation_size], generator=generator)
return train_dataset, validation_dataset, getattr(dataset, "dataset", dataset).class_to_idx
def run_epoch(
model: nn.Module,
data_loader: DataLoader,
criterion: nn.Module,
optimizer: torch.optim.Optimizer | None,
device: torch.device,
) -> tuple[float, float]:
training = optimizer is not None
model.train(training)
total_loss = 0.0
total_correct = 0
total_examples = 0
for images, labels in data_loader:
images = images.to(device)
labels = labels.to(device)
with torch.set_grad_enabled(training):
logits = model(images)
loss = criterion(logits, labels)
if training:
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item() * images.size(0)
total_correct += (logits.argmax(dim=1) == labels).sum().item()
total_examples += images.size(0)
return total_loss / total_examples, total_correct / total_examples
def export_onnx(model: nn.Module, model_dir: Path, image_size: int) -> None:
model.eval()
dummy_input = torch.randn(1, 3, image_size, image_size)
torch.onnx.export(
model,
dummy_input,
model_dir / "model.onnx",
export_params=True,
opset_version=17,
do_constant_folding=True,
input_names=["input"],
output_names=["logits"],
dynamic_axes={
"input": {0: "batch_size"},
"logits": {0: "batch_size"},
},
)
def main() -> None:
args = parse_args()
random.seed(args.seed)
torch.manual_seed(args.seed)
train_dataset, validation_dataset, class_to_idx = build_datasets(args)
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
shuffle=True,
num_workers=args.num_workers,
)
validation_loader = DataLoader(
validation_dataset,
batch_size=args.batch_size,
shuffle=False,
num_workers=args.num_workers,
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SmallImageClassifier(class_count=len(class_to_idx)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=args.learning_rate)
print(f"Training on {device}. Classes: {sorted(class_to_idx)}")
metrics = []
for epoch in range(1, args.epochs + 1):
train_loss, train_accuracy = run_epoch(model, train_loader, criterion, optimizer, device)
validation_loss, validation_accuracy = run_epoch(model, validation_loader, criterion, None, device)
epoch_metrics = {
"epoch": epoch,
"train_loss": train_loss,
"train_accuracy": train_accuracy,
"validation_loss": validation_loss,
"validation_accuracy": validation_accuracy,
}
metrics.append(epoch_metrics)
print(json.dumps(epoch_metrics, sort_keys=True))
model_dir = Path(args.model_dir)
model_dir.mkdir(parents=True, exist_ok=True)
torch.save(
{
"model_state_dict": model.cpu().state_dict(),
"class_to_idx": class_to_idx,
"image_size": args.image_size,
},
model_dir / "model.pt",
)
export_onnx(model, model_dir, args.image_size)
(model_dir / "class_to_idx.json").write_text(json.dumps(class_to_idx, indent=2), encoding="utf-8")
(model_dir / "metrics.json").write_text(json.dumps(metrics, indent=2), encoding="utf-8")
print(f"Saved model artifacts to {model_dir}")
if __name__ == "__main__":
main()

View File

@@ -5,15 +5,19 @@ build-backend = "hatchling.build"
[project] [project]
name = "qc-cli" name = "qc-cli"
version = "0.1.0" version = "0.1.0"
description = "CLI for SageMaker ONNX training and Qualcomm AI Hub optimization" description = "CLI for training and deploying models for Qualcomm AI Hub"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"aws-cdk-lib>=2.180.0", "aws-cdk-lib>=2.180.0",
"typer==0.25.0", "typer==0.25.0",
"boto3>=1.34,<1.42", "boto3>=1.34,<1.42",
"constructs>=10.0.0", "constructs>=10.0.0",
"mlflow>=3.0",
"numpy>=1.26",
"pydantic>=2.13.3", "pydantic>=2.13.3",
"pyyaml>=6.0.3", "pyyaml>=6.0.3",
"qai-hub>=0.49.0",
"sagemaker-mlflow>=0.4.0",
] ]
[project.scripts] [project.scripts]

View File

@@ -17,3 +17,20 @@ def describe_tracking_server(region: str, profile: str, name: str) -> dict[str,
): ):
return None return None
raise raise
def get_tracking_server_arn(region: str, profile: str, name: str) -> str:
server = describe_tracking_server(region, profile, name)
if not server:
raise ValueError(f"MLflow tracking server not found: {name}")
arn = server.get("TrackingServerArn")
if not arn:
raise ValueError(f"MLflow tracking server has no ARN: {name}")
return str(arn)
def create_presigned_tracking_server_url(region: str, profile: str, name: str) -> str:
client = boto3.Session(profile_name=profile, region_name=region).client("sagemaker")
response = client.create_presigned_mlflow_tracking_server_url(TrackingServerName=name)
return str(response["AuthorizedUrl"])

View File

@@ -21,6 +21,24 @@ def upload_file(
return f"s3://{bucket}/{s3_key}" return f"s3://{bucket}/{s3_key}"
def download_file(
region: str,
profile: str,
s3_uri: str,
local_path: str,
) -> str:
if not s3_uri.startswith("s3://"):
raise ValueError(f"Expected S3 URI, got: {s3_uri}")
bucket_key = s3_uri.removeprefix("s3://")
bucket, _, key = bucket_key.partition("/")
if not bucket or not key:
raise ValueError(f"Expected S3 URI with bucket and key, got: {s3_uri}")
dest = Path(local_path)
dest.parent.mkdir(parents=True, exist_ok=True)
_client(region, profile).download_file(bucket, key, str(dest))
return str(dest)
def upload_dir( def upload_dir(
region: str, region: str,
profile: str, profile: str,

View File

@@ -36,6 +36,7 @@ class TrainingJobStatus:
modified: datetime | None modified: datetime | None
model_artifacts: str | None model_artifacts: str | None
failure_reason: str | None failure_reason: str | None
raw: dict[str, Any] = field(default_factory=dict)
def _sm(session: Boto3SessionKwargs) -> SageMakerClient: def _sm(session: Boto3SessionKwargs) -> SageMakerClient:
@@ -116,9 +117,20 @@ def get_training_job_status(session: Boto3SessionKwargs, job_name: str) -> Train
modified=resp.get("LastModifiedTime"), modified=resp.get("LastModifiedTime"),
model_artifacts=resp.get("ModelArtifacts", {}).get("S3ModelArtifacts"), model_artifacts=resp.get("ModelArtifacts", {}).get("S3ModelArtifacts"),
failure_reason=resp.get("FailureReason"), failure_reason=resp.get("FailureReason"),
raw=dict(resp),
) )
def get_model_artifacts(region: str, profile: str, job_name: str) -> str:
resp = boto3.Session(profile_name=profile, region_name=region).client("sagemaker").describe_training_job(
TrainingJobName=job_name
)
artifact = resp.get("ModelArtifacts", {}).get("S3ModelArtifacts")
if not artifact:
raise RuntimeError(f"Training job '{job_name}' does not have model artifacts yet.")
return str(artifact)
def list_training_jobs( def list_training_jobs(
session: Boto3SessionKwargs, session: Boto3SessionKwargs,
max_results: int = 10, max_results: int = 10,

0
src/cloud/__init__.py Normal file
View File

567
src/commands/ai_hub.py Normal file
View File

@@ -0,0 +1,567 @@
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from pathlib import Path
from typing import Any
import qai_hub.hub as hub
import typer
from qai_hub.client import Device
from src import state as state_ops
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
from src.config import Config
from src.qualcomm import aihub_jobs
from src.qualcomm.artifacts import ResolvedOnnx, resolve_onnx
app = typer.Typer(help="Optimize, quantize, compile, validate, profile, and download models with Qualcomm Workbench")
_RUNTIME_EXTENSIONS = {
"tflite": "tflite",
"qnn_context_binary": "bin",
"onnx": "onnx",
}
class UploadStep(StrEnum):
optimize = "optimize"
quantize = "quantize"
compile = "compile"
validate = "validate"
profile = "profile"
@dataclass(frozen=True)
class ResolvedModelSource:
model: str | Path
model_artifact: str | None = None
def _input_specs(cfg: Config) -> dict[str, tuple[tuple[int, ...], str]]:
specs = {name: (tuple(shape), dtype) for name, (shape, dtype) in cfg.aihub.input_specs.items()}
if not specs:
CONSOLE.print("[red]aihub.input_specs must define at least one input.[/red]")
raise typer.Exit(1)
return specs
def _load_inputs(
input_file: Path,
specs: Mapping[str, tuple[Sequence[int], str]],
input_name: str | None = None,
) -> dict[str, Any]:
import numpy as np
if not input_file.exists():
raise FileNotFoundError(f"File not found: {input_file}")
if input_file.suffix == ".npz":
loaded = np.load(input_file)
missing = set(specs) - set(loaded.files)
if missing:
raise ValueError(f"Missing input(s) in NPZ: {', '.join(sorted(missing))}")
return {name: loaded[name] for name in specs}
if input_file.suffix == ".npy":
if input_name is None:
if len(specs) != 1:
raise ValueError("--input-name is required when config has multiple inputs")
input_name = next(iter(specs))
if input_name not in specs:
raise ValueError(f"Input name '{input_name}' is not defined in aihub.input_specs")
return {input_name: np.load(input_file)}
raise ValueError("Input file must be .npz or .npy")
def _load_calibration(path: Path, specs: Mapping[str, tuple[Sequence[int], str]]) -> dict[str, Any]:
import numpy as np
if path.is_file():
return _load_inputs(path, specs)
if not path.is_dir():
raise FileNotFoundError(f"Calibration path not found: {path}")
if len(specs) != 1:
raise ValueError("Directory calibration data is supported only for single-input models.")
input_name = next(iter(specs))
samples = [np.load(p) for p in sorted(path.glob("*.npy"))]
if not samples:
raise ValueError(f"No .npy calibration samples found in {path}")
return {input_name: samples}
def _job_name(cfg: Config, operation: str) -> str | None:
if not cfg.aihub.job_name:
return None
return f"{cfg.aihub.job_name}-{operation}"
def _model_id_or_state(config_path: str, model_id: str | None, *, quantized: bool = False) -> str:
st = state_ops.store(config_path)
resolved = model_id or (st.get_last_quantized_model_id() if quantized else st.get_last_compiled_model_id())
if not resolved:
source = "quantized" if quantized else "compiled"
CONSOLE.print(f"[red]No {source} model found. Pass --model-id or run the previous AI Hub step first.[/red]")
raise typer.Exit(1)
return resolved
def _resolve_model_source(
cfg: Config,
config_path: str,
*,
model_id: str | None = None,
previous_model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> ResolvedModelSource:
if model_id:
return ResolvedModelSource(model_id)
has_explicit_source = bool(from_job or model_s3_uri or onnx_path)
if previous_model_id and not has_explicit_source:
return ResolvedModelSource(previous_model_id)
resolved = _resolve_onnx_source(
cfg,
config_path,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
return ResolvedModelSource(resolved.onnx_path, resolved.model_artifact)
def _resolve_onnx_source(
cfg: Config,
config_path: str,
*,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> ResolvedOnnx:
st = state_ops.store(config_path)
last_training_job = st.get_last_training_job()
saved_model_artifact = None
if not from_job and not model_s3_uri and not onnx_path and not last_training_job:
saved_model_artifact = st.get_last_model_artifact()
return resolve_onnx(
cfg=cfg,
output_dir=cfg.aihub.output_dir,
from_job=from_job,
model_s3_uri=model_s3_uri or saved_model_artifact,
onnx_path=onnx_path,
last_training_job=last_training_job,
)
def _device_selector(device: Device) -> str:
parts: list[str] = []
if device.name:
parts.append(f"name={device.name!r}")
if device.os:
parts.append(f"os={device.os!r}")
if device.attributes:
parts.append(f"attributes={device.attributes!r}")
return ", ".join(parts) if parts else "empty selector"
def _validate_device(cfg: Config) -> None:
device = cfg.aihub.device
try:
matches = hub.get_devices(name=device.name, os=device.os, attributes=device.attributes)
except Exception as e:
CONSOLE.print(f"[red]Unable to validate AI Hub device {_device_selector(device)}: {e}[/red]")
raise typer.Exit(1)
if matches:
return
CONSOLE.print(f"[red]AI Hub device not found: {_device_selector(device)}[/red]")
CONSOLE.print("Run [bold]qai-hub list-devices[/bold] to see valid device names.")
raise typer.Exit(1)
def _quantize_step(
cfg: Config,
config_path: str,
calibration_path: Path,
*,
model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> str:
st = state_ops.store(config_path)
specs = _input_specs(cfg)
try:
source = _resolve_model_source(
cfg,
config_path,
model_id=model_id,
previous_model_id=st.get_last_optimized_model_id(),
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
calibration_data = _load_calibration(calibration_path, specs)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
try:
hub_model = (
hub.upload_model(str(source.model), name=cfg.aihub.model_name)
if isinstance(source.model, Path)
else hub.get_model(source.model)
)
result = aihub_jobs.submit_quantize_job(
hub_model,
calibration_data,
cfg.aihub.quantize_options,
job_name=_job_name(cfg, "quantize"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub quantize failed: {e}[/red]")
raise typer.Exit(1)
updates: dict[str, Any] = {
"last_quantize_job_id": result["job_id"],
"last_quantized_model_id": result["model_id"],
}
if source.model_artifact:
updates["last_model_artifact"] = source.model_artifact
st.update(**updates)
CONSOLE.print(f"[green]✓[/green] Quantize job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Quantized model: [bold]{result['model_id']}[/bold]")
return str(result["model_id"])
def _optimize_step(
cfg: Config,
config_path: str,
from_job: str | None,
model_s3_uri: str | None,
onnx_path: str | None,
) -> str:
st = state_ops.store(config_path)
_validate_device(cfg)
specs = _input_specs(cfg)
try:
source = _resolve_onnx_source(
cfg,
config_path,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
try:
hub_model = hub.upload_model(str(source.onnx_path), name=cfg.aihub.model_name)
result = aihub_jobs.submit_compile_job(
model=hub_model,
device=cfg.aihub.device,
input_specs=specs,
target_runtime="onnx",
job_name=_job_name(cfg, "optimize"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub ONNX optimization failed: {e}[/red]")
raise typer.Exit(1)
st.update(
last_model_artifact=source.model_artifact,
last_optimize_job_id=result["job_id"],
last_optimized_model_id=result["model_id"],
)
CONSOLE.print(f"[green]✓[/green] ONNX optimization job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Optimized ONNX model: [bold]{result['model_id']}[/bold]")
return str(result["model_id"])
def _compile_step(
cfg: Config,
config_path: str,
*,
model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> str:
st = state_ops.store(config_path)
_validate_device(cfg)
specs = _input_specs(cfg)
try:
source = _resolve_model_source(
cfg,
config_path,
model_id=model_id,
previous_model_id=st.get_last_quantized_model_id(),
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
try:
hub_model = (
hub.upload_model(str(source.model), name=cfg.aihub.model_name)
if isinstance(source.model, Path)
else hub.get_model(source.model)
)
result = aihub_jobs.submit_compile_job(
model=hub_model,
device=cfg.aihub.device,
input_specs=specs,
target_runtime=cfg.aihub.target_runtime,
options=cfg.aihub.compile_options,
job_name=_job_name(cfg, "compile"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub compile failed: {e}[/red]")
raise typer.Exit(1)
updates: dict[str, Any] = {
"last_compile_job_id": result["job_id"],
"last_compiled_model_id": result["model_id"],
}
if source.model_artifact:
updates["last_model_artifact"] = source.model_artifact
st.update(**updates)
CONSOLE.print(f"[green]✓[/green] Compile job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Compiled model: [bold]{result['model_id']}[/bold]")
return str(result["model_id"])
def _validate_step(
cfg: Config,
config_path: str,
input_file: Path,
model_id: str | None,
input_name: str | None,
) -> str:
_validate_device(cfg)
specs = _input_specs(cfg)
resolved_model_id = _model_id_or_state(config_path, model_id)
try:
inputs = _load_inputs(input_file, specs, input_name)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
run = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = Path(cfg.aihub.output_dir) / run / "validation"
try:
hub_model = hub.get_model(resolved_model_id)
result = aihub_jobs.submit_inference_job(
hub_model,
cfg.aihub.device,
inputs,
out_dir,
job_name=_job_name(cfg, "validate"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub inference failed: {e}[/red]")
raise typer.Exit(1)
state_ops.store(config_path).update(last_inference_job_id=result["job_id"])
CONSOLE.print(f"[green]✓[/green] Inference job: [bold]{result['job_id']}[/bold]")
outputs = result.get("outputs")
if isinstance(outputs, dict):
for name, value in outputs.items():
CONSOLE.print(f" {name}: shape={getattr(value, 'shape', '?')}")
CONSOLE.print(f"Outputs: [cyan]{out_dir}[/cyan]")
return str(result["job_id"])
def _profile_step(cfg: Config, config_path: str, model_id: str | None) -> str:
_validate_device(cfg)
resolved_model_id = _model_id_or_state(config_path, model_id)
try:
hub_model = hub.get_model(resolved_model_id)
result = aihub_jobs.submit_profile_job(
hub_model,
cfg.aihub.device,
cfg.aihub.profile_options,
job_name=_job_name(cfg, "profile"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub profile failed: {e}[/red]")
raise typer.Exit(1)
state_ops.store(config_path).update(last_profile_job_id=result["job_id"])
CONSOLE.print(f"[green]✓[/green] Profile job: [bold]{result['job_id']}[/bold]")
return str(result["job_id"])
@app.command()
def optimize(
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should optimize"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to optimize"),
onnx_path: str | None = typer.Option(
None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact"
),
config: str = CONFIG_OPT,
) -> None:
"""Optimize an external model into a Workbench-produced ONNX model."""
cfg = load_cfg(config)
_optimize_step(cfg, config, from_job, model_s3_uri, onnx_path)
@app.command()
def quantize(
calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"),
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub optimized ONNX model ID"),
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should quantize"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to quantize"),
onnx_path: str | None = typer.Option(
None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact"
),
config: str = CONFIG_OPT,
) -> None:
"""Quantize an ONNX model to INT8."""
cfg = load_cfg(config)
_quantize_step(
cfg,
config,
calibration_path,
model_id=model_id,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
@app.command()
def compile(
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub model ID to compile"),
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should compile"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to compile"),
onnx_path: str | None = typer.Option(
None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact"
),
config: str = CONFIG_OPT,
) -> None:
"""Compile a model for the configured Qualcomm AI Hub target."""
cfg = load_cfg(config)
_compile_step(
cfg,
config,
model_id=model_id,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
@app.command()
def validate(
input_file: Path = typer.Argument(..., help="NumPy .npz or .npy inputs to run on device"),
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"),
input_name: str | None = typer.Option(None, "--input-name", help="Input name for .npy files"),
config: str = CONFIG_OPT,
) -> None:
"""Run an AI Hub inference job using sample inputs."""
cfg = load_cfg(config)
_validate_step(cfg, config, input_file, model_id, input_name)
@app.command()
def profile(
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"),
config: str = CONFIG_OPT,
) -> None:
"""Profile a compiled model on the configured AI Hub device."""
cfg = load_cfg(config)
_profile_step(cfg, config, model_id)
@app.command()
def upload(
calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"),
input_file: Path = typer.Argument(..., help="Validation .npz or .npy inputs to run on device"),
from_step: UploadStep = typer.Option(UploadStep.optimize, "--from-step", help="Resume from this Workbench step"),
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should upload"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to upload"),
onnx_path: str | None = typer.Option(
None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact"
),
input_name: str | None = typer.Option(None, "--input-name", help="Input name for .npy validation files"),
config: str = CONFIG_OPT,
) -> None:
"""Optimize, quantize, optionally compile, validate, and profile a model."""
cfg = load_cfg(config)
steps = [UploadStep.optimize, UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile]
selected = steps[steps.index(from_step) :]
optimized_model_id: str | None = None
quantized_model_id: str | None = None
compiled_model_id: str | None = None
if UploadStep.optimize in selected:
optimized_model_id = _optimize_step(cfg, config, from_job, model_s3_uri, onnx_path)
if UploadStep.quantize in selected:
if UploadStep.optimize not in selected:
optimized_model_id = state_ops.store(config).get_last_optimized_model_id()
if not optimized_model_id:
CONSOLE.print(
"[red]No optimized ONNX model found. Resume from --from-step optimize or run "
"'qc-cli ai-hub optimize' first.[/red]"
)
raise typer.Exit(1)
quantized_model_id = _quantize_step(
cfg,
config,
calibration_path,
model_id=optimized_model_id,
)
if UploadStep.compile in selected:
if cfg.aihub.target_runtime == "onnx":
compiled_model_id = quantized_model_id or state_ops.store(config).get_last_quantized_model_id()
if not compiled_model_id:
CONSOLE.print(
"[red]No quantized ONNX model found. Resume from --from-step quantize or run "
"'qc-cli ai-hub quantize' first.[/red]"
)
raise typer.Exit(1)
state_ops.store(config).update(last_compiled_model_id=compiled_model_id)
CONSOLE.print("[green]✓[/green] Target runtime is ONNX; skipping final compile.")
else:
compiled_model_id = _compile_step(
cfg,
config,
model_id=quantized_model_id,
)
if UploadStep.validate in selected:
_validate_step(cfg, config, input_file, compiled_model_id, input_name)
if UploadStep.profile in selected:
_profile_step(cfg, config, compiled_model_id)
@app.command()
def download(
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"),
output: Path | None = typer.Option(None, "--output", "-o", help="Destination file path"),
config: str = CONFIG_OPT,
) -> None:
"""Download the last compiled deployable artifact from AI Hub."""
cfg = load_cfg(config)
resolved_model_id = _model_id_or_state(config, model_id)
ext = _RUNTIME_EXTENSIONS.get(cfg.aihub.target_runtime, cfg.aihub.target_runtime)
dest = output or (Path(cfg.aihub.output_dir) / f"model.{ext}")
try:
written = aihub_jobs.download_model(resolved_model_id, dest)
except Exception as e:
CONSOLE.print(f"[red]AI Hub download failed: {e}[/red]")
raise typer.Exit(1)
state_ops.store(config).update(last_downloaded_model=written)
CONSOLE.print(f"[green]✓[/green] Downloaded model: [cyan]{written}[/cyan]")

View File

@@ -77,7 +77,8 @@ def setup(
if outputs.get("SageMakerRoleArn"): if outputs.get("SageMakerRoleArn"):
CONSOLE.print(f"[green]✓[/green] IAM role: {outputs['SageMakerRoleArn']}") CONSOLE.print(f"[green]✓[/green] IAM role: {outputs['SageMakerRoleArn']}")
if cfg.mlflow.mode is MlflowMode.create and outputs.get("MlflowTrackingServerArn"): if cfg.mlflow.mode is MlflowMode.create and outputs.get("MlflowTrackingServerArn"):
CONSOLE.print(f"[green]✓[/green] MLflow: {outputs['MlflowTrackingServerArn']}") mlflow_name = outputs.get("MlflowTrackingServerName", cfg.managed_mlflow_tracking_server_name)
CONSOLE.print(f"[green]✓[/green] MLflow: {mlflow_name}")
elif cfg.mlflow.mode is MlflowMode.existing: elif cfg.mlflow.mode is MlflowMode.existing:
CONSOLE.print(f"[green]✓[/green] MLflow: {cfg.mlflow.tracking_server_name}") CONSOLE.print(f"[green]✓[/green] MLflow: {cfg.mlflow.tracking_server_name}")
CONSOLE.print("\n[bold green]Infrastructure ready.[/bold green]") CONSOLE.print("\n[bold green]Infrastructure ready.[/bold green]")
@@ -102,7 +103,7 @@ def status(config: str = CONFIG_OPT) -> None:
if cfg.mlflow.mode is not MlflowMode.disabled: if cfg.mlflow.mode is not MlflowMode.disabled:
table.add_row( table.add_row(
"MLflow", "MLflow",
cfg.mlflow.tracking_server_name or "-", cfg.effective_mlflow_tracking_server_name or "-",
"[red]unknown[/red]", "[red]unknown[/red]",
"-", "-",
) )
@@ -126,7 +127,7 @@ def status(config: str = CONFIG_OPT) -> None:
if cfg.mlflow.mode is MlflowMode.create: if cfg.mlflow.mode is MlflowMode.create:
table.add_row( table.add_row(
"MLflow", "MLflow",
cfg.mlflow.tracking_server_name or "-", outputs.get("MlflowTrackingServerName", cfg.managed_mlflow_tracking_server_name),
"[green]managed[/green]", "[green]managed[/green]",
outputs.get("MlflowTrackingServerArn", outputs.get("MlflowArtifactUri", "-")), outputs.get("MlflowTrackingServerArn", outputs.get("MlflowArtifactUri", "-")),
) )
@@ -209,6 +210,7 @@ def _role_name(configured_name: str, role_arn: str) -> str:
return role_arn.rsplit("/", 1)[-1] return role_arn.rsplit("/", 1)[-1]
return "-" return "-"
def _destroy_account_id(config_path: str, cfg: Config) -> str: def _destroy_account_id(config_path: str, cfg: Config) -> str:
config_dir = str(Path(config_path).parent) config_dir = str(Path(config_path).parent)
state = read_infra_state(config_dir) state = read_infra_state(config_dir)

40
src/commands/init.py Normal file
View File

@@ -0,0 +1,40 @@
import secrets
from pathlib import Path
import typer
import yaml
from src.commands.utils import CONSOLE
from src.config import GENERATED_STACK_PREFIX, Config, InfraConfig, S3Config
app = typer.Typer()
@app.command()
def init(
output: str = typer.Option("config.yaml", help="Destination path for the config file"),
force: bool = typer.Option(False, "--force", "-f", help="Overwrite an existing config file"),
) -> None:
"""Write a starter config.yaml to the current directory."""
dest = Path(output)
if dest.exists() and not force:
CONSOLE.print(f"[yellow]{dest} already exists.[/yellow] Use --force to overwrite.")
raise typer.Exit(1)
config = _new_isolated_config()
dest.parent.mkdir(parents=True, exist_ok=True)
config_data = config.model_dump(mode="json")
config_data["sagemaker"].pop("role_name", None)
with open(dest, "w") as f:
yaml.safe_dump(config_data, f, sort_keys=False)
CONSOLE.print(f"[green]✓[/green] Config written to [bold]{dest}[/bold]")
CONSOLE.print("Edit [cyan]sagemaker.training.image_uri[/cyan] before running training commands.")
def _new_isolated_config() -> Config:
suffix = secrets.token_hex(6)
namespace = f"{GENERATED_STACK_PREFIX}{suffix}"
config = Config(infra=InfraConfig(stack_name=namespace))
config.s3 = S3Config(bucket=f"{namespace}-data")
return config

41
src/commands/mlflow.py Normal file
View File

@@ -0,0 +1,41 @@
import webbrowser
import typer
from src.aws import mlflow as aws_mlflow
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
app = typer.Typer(help="Manage MLflow tracking server access")
@app.command(name="open")
def open_mlflow(config: str = CONFIG_OPT) -> None:
"""Open a presigned URL for the configured MLflow tracking server."""
cfg = load_cfg(config)
tracking_server_name = cfg.effective_mlflow_tracking_server_name
if not tracking_server_name:
CONSOLE.print("[red]MLflow is disabled in config.yaml.[/red]")
raise typer.Exit(1)
try:
url = aws_mlflow.create_presigned_tracking_server_url(
cfg.aws.region,
cfg.aws.profile,
tracking_server_name,
)
except Exception as e:
CONSOLE.print("[yellow]Could not create a SageMaker MLflow UI URL.[/yellow]")
CONSOLE.print(f"Tracking server: [cyan]{tracking_server_name}[/cyan]")
CONSOLE.print(f"Reason: {e}")
CONSOLE.print(
"This command can create presigned URLs only for MLflow tracking servers managed by "
"Amazon SageMaker. If this is an external MLflow server, open it with that server's own URL."
)
raise typer.Exit(1)
CONSOLE.print(f"MLflow tracking server: [cyan]{tracking_server_name}[/cyan]")
CONSOLE.print(f"MLflow UI: {url}")
if webbrowser.open(url):
CONSOLE.print("[green]✓[/green] Opened MLflow UI in your browser.")
else:
CONSOLE.print("[yellow]Could not open a browser automatically. Open the URL above manually.[/yellow]")

View File

@@ -8,8 +8,9 @@ from src import state as state_ops
from src.aws import iam from src.aws import iam
from src.aws import sagemaker as sm_ops from src.aws import sagemaker as sm_ops
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
from src.config import Config from src.config import Config, MlflowMode
from src.infra.state import read_infra_state from src.infra.state import read_infra_state
from src.tracking.mlflow import MlflowTracker
app = typer.Typer(help="Manage SageMaker training jobs") app = typer.Typer(help="Manage SageMaker training jobs")
@@ -22,6 +23,14 @@ _STATUS_COLOR = {
} }
def _tracker(cfg):
try:
return MlflowTracker.from_config(cfg)
except Exception as e:
CONSOLE.print(f"[red]MLflow setup failed: {e}[/red]")
raise typer.Exit(1)
def _config_dir(config_path: str) -> str: def _config_dir(config_path: str) -> str:
return str(Path(config_path).parent) return str(Path(config_path).parent)
@@ -58,6 +67,7 @@ def start(config: str = CONFIG_OPT) -> None:
CONSOLE.print(f"[red]{e}[/red]") CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
tracker = _tracker(cfg)
job_name = f"qc-cli-{datetime.now().strftime('%Y%m%d-%H%M%S')}" job_name = f"qc-cli-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
s3_train_uri = f"s3://{cfg.s3.bucket}/{cfg.s3.data_prefix}" s3_train_uri = f"s3://{cfg.s3.bucket}/{cfg.s3.data_prefix}"
s3_output = f"s3://{cfg.s3.bucket}/{cfg.s3.model_prefix}" s3_output = f"s3://{cfg.s3.bucket}/{cfg.s3.model_prefix}"
@@ -77,9 +87,21 @@ def start(config: str = CONFIG_OPT) -> None:
) )
sm_ops.start_training_job(cfg.aws.boto3_session, training_job) sm_ops.start_training_job(cfg.aws.boto3_session, training_job)
state_ops.write_state(_config_dir(config), last_training_job=job_name) st = state_ops.store(config)
st.set_last_training_job(job_name)
run_id = tracker.start_training_run(
training_job,
region=cfg.aws.region,
profile=cfg.aws.profile,
role_arn=role_arn,
)
if run_id:
st.update_training_job(job_name, mlflow_run_id=run_id)
CONSOLE.print(f"[green]✓[/green] Job submitted: [bold]{job_name}[/bold]") CONSOLE.print(f"[green]✓[/green] Job submitted: [bold]{job_name}[/bold]")
if run_id:
CONSOLE.print(f"MLflow run: [cyan]{run_id}[/cyan]")
CONSOLE.print("Open MLflow: [cyan]qc-cli mlflow open[/cyan]")
CONSOLE.print("Track progress: [cyan]qc-cli train status[/cyan]") CONSOLE.print("Track progress: [cyan]qc-cli train status[/cyan]")
@@ -90,9 +112,10 @@ def status(
) -> None: ) -> None:
"""Show training job status.""" """Show training job status."""
cfg = load_cfg(config) cfg = load_cfg(config)
st = state_ops.store(config)
if not job_name: if not job_name:
job_name = state_ops.get_last_training_job(_config_dir(config)) job_name = st.get_last_training_job()
if not job_name: if not job_name:
CONSOLE.print( CONSOLE.print(
"[red]No training job found in state. Pass a job name or run 'qc-cli train start' first.[/red]" "[red]No training job found in state. Pass a job name or run 'qc-cli train start' first.[/red]"
@@ -111,6 +134,25 @@ def status(
if status.failure_reason: if status.failure_reason:
CONSOLE.print(f"[red]Failure: {status.failure_reason}[/red]") CONSOLE.print(f"[red]Failure: {status.failure_reason}[/red]")
job_state = st.get_training_job(job_name)
run_id = job_state.get("mlflow_run_id")
already_registered = job_state.get("registered_model_version")
if run_id and not already_registered and status.status in {"Completed", "Failed", "Stopped"}:
tracker = _tracker(cfg)
version = tracker.finalize_training_run(
run_id=str(run_id),
training_job_status=status,
)
updates = {"mlflow_finalized_status": status.status}
if version:
updates["registered_model_version"] = version
st.update_training_job(job_name, **updates)
if version:
st.set_latest_experiment_model_version(version)
CONSOLE.print(f"MLflow model version: [cyan]{version}[/cyan] ([cyan]experiment-latest[/cyan])")
if run_id and cfg.mlflow.mode is not MlflowMode.disabled:
CONSOLE.print("Open MLflow: [cyan]qc-cli mlflow open[/cyan]")
@app.command(name="list") @app.command(name="list")
def list_jobs( def list_jobs(

70
src/commands/upload.py Normal file
View File

@@ -0,0 +1,70 @@
from pathlib import Path
import typer
from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn
from src.aws import s3 as s3_ops
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
app = typer.Typer()
@app.command()
def upload(
path: Path = typer.Argument(..., help="Local file or directory to upload"),
s3_key: str | None = typer.Option(None, "--s3-key", help="S3 key for file uploads"),
config: str = CONFIG_OPT,
) -> None:
"""Upload a local file or directory to S3."""
cfg = load_cfg(config)
if path.is_file():
key = s3_key or f"{cfg.s3.data_prefix.rstrip('/')}/{path.name}"
try:
with CONSOLE.status(f"Uploading {path.name}..."):
uri = s3_ops.upload_file(cfg.aws.region, cfg.aws.profile, cfg.s3.bucket, str(path), key)
except Exception as e:
CONSOLE.print(f"[red]Upload failed: {e}[/red]")
raise typer.Exit(1)
CONSOLE.print(f"[green]✓[/green] {path.name} -> {uri}")
return
if path.is_dir():
if s3_key is not None:
CONSOLE.print("[red]--s3-key can only be used when uploading a single file.[/red]")
raise typer.Exit(1)
files = [file for file in path.rglob("*") if file.is_file()]
if not files:
CONSOLE.print("[yellow]No files found in directory.[/yellow]")
raise typer.Exit(0)
prefix = cfg.s3.data_prefix
CONSOLE.print(f"Uploading {len(files)} files to s3://{cfg.s3.bucket}/{prefix.rstrip('/')}/")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=CONSOLE,
) as progress:
task = progress.add_task("Uploading...", total=len(files))
count = s3_ops.upload_dir(
cfg.aws.region,
cfg.aws.profile,
cfg.s3.bucket,
str(path),
prefix,
on_progress=lambda: progress.advance(task),
)
except Exception as e:
CONSOLE.print(f"[red]Upload failed: {e}[/red]")
raise typer.Exit(1)
CONSOLE.print(f"[green]✓[/green] Uploaded {count} files to s3://{cfg.s3.bucket}/{prefix.rstrip('/')}/")
return
CONSOLE.print(f"[red]Path not found: {path}[/red]")
raise typer.Exit(1)

View File

@@ -1,19 +1,20 @@
import re import re
from enum import Enum from enum import StrEnum
from typing import Any, Literal, TypedDict from typing import Any, Literal, TypedDict
from mypy_boto3_s3.literals import BucketLocationConstraintType from mypy_boto3_s3.literals import BucketLocationConstraintType
from mypy_boto3_sagemaker.literals import TrainingInstanceTypeType from mypy_boto3_sagemaker.literals import TrainingInstanceTypeType
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, field_validator, model_validator
from qai_hub.client import Device
class MlflowMode(str, Enum): class MlflowMode(StrEnum):
disabled = "disabled" disabled = "disabled"
create = "create" create = "create"
existing = "existing" existing = "existing"
class MlflowServerSize(str, Enum): class MlflowServerSize(StrEnum):
small = "Small" small = "Small"
medium = "Medium" medium = "Medium"
large = "Large" large = "Large"
@@ -80,9 +81,31 @@ class SageMakerConfig(BaseModel):
training: TrainingConfig = Field(default_factory=TrainingConfig) training: TrainingConfig = Field(default_factory=TrainingConfig)
class AIHubConfig(BaseModel):
device: Device = Field(default_factory=lambda: Device("Samsung Galaxy S25 (Family)"))
target_runtime: str = "tflite"
input_specs: dict[str, tuple[list[int], str]] = Field(default_factory=dict)
job_name: str | None = None
model_name: str | None = None
compile_options: str | None = None
profile_options: str | None = None
quantize_options: str | None = None
output_dir: str = "build/qai-hub"
@field_validator("device", mode="before")
@classmethod
def parse_device(cls, value: Any) -> Any:
if isinstance(value, str):
return Device(value)
return value
class MlflowConfig(BaseModel): class MlflowConfig(BaseModel):
mode: MlflowMode = MlflowMode.disabled mode: MlflowMode = MlflowMode.disabled
tracking_server_name: str | None = None tracking_server_name: str | None = None
experiment_name: str = "qc-cli-training"
registered_model_name: str = "qc-cli-model"
register_trained_models: bool = True
artifact_prefix: str = "mlflow/" artifact_prefix: str = "mlflow/"
tracking_server_size: MlflowServerSize = MlflowServerSize.small tracking_server_size: MlflowServerSize = MlflowServerSize.small
mlflow_version: str | None = None mlflow_version: str | None = None
@@ -91,8 +114,8 @@ class MlflowConfig(BaseModel):
@model_validator(mode="after") @model_validator(mode="after")
def require_tracking_server_name(self) -> "MlflowConfig": def require_tracking_server_name(self) -> "MlflowConfig":
if self.mode in {MlflowMode.create, MlflowMode.existing} and not self.tracking_server_name: if self.mode is MlflowMode.existing and not self.tracking_server_name:
raise ValueError("mlflow.tracking_server_name is required when mlflow.mode is create or existing") raise ValueError("mlflow.tracking_server_name is required when mlflow.mode is existing")
return self return self
@@ -101,4 +124,17 @@ class Config(BaseModel):
aws: AwsConfig = Field(default_factory=AwsConfig) aws: AwsConfig = Field(default_factory=AwsConfig)
s3: S3Config = Field(default_factory=S3Config) s3: S3Config = Field(default_factory=S3Config)
sagemaker: SageMakerConfig = Field(default_factory=SageMakerConfig) sagemaker: SageMakerConfig = Field(default_factory=SageMakerConfig)
aihub: AIHubConfig = Field(default_factory=AIHubConfig)
mlflow: MlflowConfig = Field(default_factory=MlflowConfig) mlflow: MlflowConfig = Field(default_factory=MlflowConfig)
@property
def managed_mlflow_tracking_server_name(self) -> str:
return f"{self.infra.stack_name}-mlflow"
@property
def effective_mlflow_tracking_server_name(self) -> str | None:
if self.mlflow.mode is MlflowMode.disabled:
return None
if self.mlflow.mode is MlflowMode.existing:
return self.mlflow.tracking_server_name
return self.managed_mlflow_tracking_server_name

View File

@@ -74,6 +74,7 @@ class QCStack(Stack):
CfnOutput(self, "SageMakerRoleArn", value=role.attr_arn) CfnOutput(self, "SageMakerRoleArn", value=role.attr_arn)
if config.mlflow.mode is MlflowMode.create: if config.mlflow.mode is MlflowMode.create:
tracking_server_name = config.managed_mlflow_tracking_server_name
artifact_prefix = config.mlflow.artifact_prefix.strip("/") artifact_prefix = config.mlflow.artifact_prefix.strip("/")
artifact_uri = ( artifact_uri = (
f"s3://{data_bucket.bucket_name}/{artifact_prefix}/" f"s3://{data_bucket.bucket_name}/{artifact_prefix}/"
@@ -145,14 +146,14 @@ class QCStack(Stack):
"MlflowTrackingServer", "MlflowTrackingServer",
artifact_store_uri=artifact_uri, artifact_store_uri=artifact_uri,
role_arn=mlflow_role.attr_arn, role_arn=mlflow_role.attr_arn,
tracking_server_name=config.mlflow.tracking_server_name or "", tracking_server_name=tracking_server_name,
automatic_model_registration=config.mlflow.automatic_model_registration, automatic_model_registration=config.mlflow.automatic_model_registration,
mlflow_version=config.mlflow.mlflow_version, mlflow_version=config.mlflow.mlflow_version,
tracking_server_size=config.mlflow.tracking_server_size.value, tracking_server_size=config.mlflow.tracking_server_size.value,
weekly_maintenance_window_start=config.mlflow.weekly_maintenance_window_start, weekly_maintenance_window_start=config.mlflow.weekly_maintenance_window_start,
) )
CfnOutput(self, "MlflowTrackingServerName", value=config.mlflow.tracking_server_name or "") CfnOutput(self, "MlflowTrackingServerName", value=tracking_server_name)
CfnOutput(self, "MlflowTrackingServerArn", value=tracking_server.attr_tracking_server_arn) CfnOutput(self, "MlflowTrackingServerArn", value=tracking_server.attr_tracking_server_arn)
CfnOutput(self, "MlflowArtifactUri", value=artifact_uri) CfnOutput(self, "MlflowArtifactUri", value=artifact_uri)
CfnOutput(self, "MlflowRoleArn", value=mlflow_role.attr_arn) CfnOutput(self, "MlflowRoleArn", value=mlflow_role.attr_arn)

View File

@@ -1,114 +1,14 @@
import secrets
from pathlib import Path
import typer import typer
import yaml
from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn
from src.aws import s3 as s3_ops from src.commands import ai_hub, infra, init, mlflow, train, upload
from src.commands import infra, train
from src.commands.utils import CONFIG_OPT, load_cfg
from src.config import GENERATED_STACK_PREFIX, Config, InfraConfig, S3Config
app = typer.Typer( app = typer.Typer(
help="qc-cli: End-to-end model managment for Qualcomm AI Hub.", help="qc-cli: End-to-end model managment for Qualcomm AI Hub.",
no_args_is_help=True, no_args_is_help=True,
) )
app.add_typer(init.app)
app.add_typer(upload.app)
app.add_typer(mlflow.app, name="mlflow")
app.add_typer(infra.app, name="infra") app.add_typer(infra.app, name="infra")
app.add_typer(train.app, name="train") app.add_typer(train.app, name="train")
app.add_typer(ai_hub.app, name="ai-hub")
console = Console()
@app.command()
def init(
output: str = typer.Option("config.yaml", help="Destination path for the config file"),
force: bool = typer.Option(False, "--force", "-f", help="Overwrite an existing config file"),
) -> None:
"""Write a starter config.yaml to the current directory."""
dest = Path(output)
if dest.exists() and not force:
console.print(f"[yellow]{dest} already exists.[/yellow] Use --force to overwrite.")
raise typer.Exit(1)
config = _new_isolated_config()
dest.parent.mkdir(parents=True, exist_ok=True)
config_data = config.model_dump(mode="json")
config_data["sagemaker"].pop("role_name", None)
with open(dest, "w") as f:
yaml.safe_dump(config_data, f, sort_keys=False)
console.print(f"[green]✓[/green] Config written to [bold]{dest}[/bold]")
console.print(
"Edit [cyan]sagemaker.training.image_uri[/cyan] before running training commands."
)
def _new_isolated_config() -> Config:
suffix = secrets.token_hex(6)
namespace = f"{GENERATED_STACK_PREFIX}{suffix}"
config = Config(infra=InfraConfig(stack_name=namespace))
config.s3 = S3Config(bucket=f"{namespace}-data")
return config
@app.command()
def upload(
path: Path = typer.Argument(..., help="Local file or directory to upload"),
s3_key: str | None = typer.Option(None, "--s3-key", help="S3 key for file uploads"),
config: str = CONFIG_OPT,
) -> None:
"""Upload a local file or directory to S3."""
cfg = load_cfg(config)
if path.is_file():
key = s3_key or f"{cfg.s3.data_prefix.rstrip('/')}/{path.name}"
try:
with console.status(f"Uploading {path.name}..."):
uri = s3_ops.upload_file(cfg.aws.region, cfg.aws.profile, cfg.s3.bucket, str(path), key)
except Exception as e:
console.print(f"[red]Upload failed: {e}[/red]")
raise typer.Exit(1)
console.print(f"[green]✓[/green] {path.name} -> {uri}")
return
if path.is_dir():
if s3_key is not None:
console.print("[red]--s3-key can only be used when uploading a single file.[/red]")
raise typer.Exit(1)
files = [file for file in path.rglob("*") if file.is_file()]
if not files:
console.print("[yellow]No files found in directory.[/yellow]")
raise typer.Exit(0)
prefix = cfg.s3.data_prefix
console.print(f"Uploading {len(files)} files to s3://{cfg.s3.bucket}/{prefix.rstrip('/')}/")
try:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
) as progress:
task = progress.add_task("Uploading...", total=len(files))
count = s3_ops.upload_dir(
cfg.aws.region,
cfg.aws.profile,
cfg.s3.bucket,
str(path),
prefix,
on_progress=lambda: progress.advance(task),
)
except Exception as e:
console.print(f"[red]Upload failed: {e}[/red]")
raise typer.Exit(1)
console.print(f"[green]✓[/green] Uploaded {count} files to s3://{cfg.s3.bucket}/{prefix.rstrip('/')}/")
return
console.print(f"[red]Path not found: {path}[/red]")
raise typer.Exit(1)

0
src/qualcomm/__init__.py Normal file
View File

114
src/qualcomm/aihub_jobs.py Normal file
View File

@@ -0,0 +1,114 @@
from pathlib import Path
from typing import Any, TypedDict
import qai_hub.hub as hub
from qai_hub.client import CompileJob, Device, InferenceJob, Model, ProfileJob, QuantizeDtype, QuantizeJob
class ModelJobResult(TypedDict):
job: CompileJob | QuantizeJob
job_id: str
model: Model
model_id: str
class InferenceJobResult(TypedDict):
job: InferenceJob
job_id: str
outputs: Any
class ProfileJobResult(TypedDict):
job: ProfileJob
job_id: str
def _dataset_entries(inputs: dict[str, Any]) -> dict[str, list[Any]]:
return {name: value if isinstance(value, list) else [value] for name, value in inputs.items()}
def submit_compile_job(
model: Model,
device: Device,
input_specs: dict[str, tuple[tuple[int, ...], str]],
target_runtime: str,
options: str | None = None,
job_name: str | None = None,
) -> ModelJobResult:
compile_options = f"--target_runtime {target_runtime}"
if options:
compile_options = f"{compile_options} {options}"
job = hub.submit_compile_job(
model=model,
device=device,
name=job_name,
input_specs=input_specs,
options=compile_options,
)
target_model = job.get_target_model()
if target_model is None:
raise RuntimeError(f"Compile job {job.job_id} did not produce a target model.")
return {"job": job, "job_id": str(job.job_id), "model": target_model, "model_id": str(target_model.model_id)}
def submit_inference_job(
model: Model,
device: Device,
inputs: dict[str, Any],
output_dir: str | Path,
job_name: str | None = None,
) -> InferenceJobResult:
job = hub.submit_inference_job(
model=model,
device=device,
inputs=_dataset_entries(inputs),
name=job_name,
)
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
data = job.download_output_data(str(out))
return {"job": job, "job_id": str(job.job_id), "outputs": data}
def submit_profile_job(
model: Model,
device: Device,
options: str | None = None,
job_name: str | None = None,
) -> ProfileJobResult:
job = hub.submit_profile_job(
model=model,
device=device,
name=job_name,
options=options or "",
)
return {"job": job, "job_id": str(job.job_id)}
def submit_quantize_job(
model: Model,
calibration_data: dict[str, Any],
options: str | None = None,
job_name: str | None = None,
) -> ModelJobResult:
job = hub.submit_quantize_job(
model=model,
calibration_data=_dataset_entries(calibration_data),
weights_dtype=QuantizeDtype.INT8,
activations_dtype=QuantizeDtype.INT8,
name=job_name,
options=options or "",
)
target_model = job.get_target_model()
if target_model is None:
raise RuntimeError(f"Quantize job {job.job_id} did not produce a target model.")
return {"job": job, "job_id": str(job.job_id), "model": target_model, "model_id": str(target_model.model_id)}
def download_model(model_id: str, output_path: str | Path) -> str:
dest = Path(output_path)
dest.parent.mkdir(parents=True, exist_ok=True)
model = hub.get_model(model_id)
result = model.download(str(dest))
return str(result or dest)

83
src/qualcomm/artifacts.py Normal file
View File

@@ -0,0 +1,83 @@
import tarfile
from dataclasses import dataclass
from pathlib import Path
from src.aws import s3 as s3_ops
from src.aws import sagemaker as sm_ops
from src.config import Config
@dataclass(frozen=True)
class ResolvedOnnx:
onnx_path: Path
model_artifact: str | None
run_name: str
def _safe_extract(tar: tarfile.TarFile, dest: Path) -> None:
dest_root = dest.resolve()
for member in tar.getmembers():
target = (dest / member.name).resolve()
if dest_root != target and dest_root not in target.parents:
raise ValueError(f"Unsafe tar member path: {member.name}")
tar.extractall(dest, filter="data")
def _find_onnx(root: Path, explicit: str | None = None) -> Path:
if explicit:
p = Path(explicit)
if not p.is_absolute():
p = root / p
if not p.exists():
raise FileNotFoundError(f"ONNX file not found: {p}")
return p
matches = sorted(root.rglob("model.onnx"))
if not matches:
matches = sorted(root.rglob("*.onnx"))
if not matches:
raise FileNotFoundError(f"No ONNX file found under {root}")
if len(matches) > 1:
joined = ", ".join(str(p.relative_to(root)) for p in matches)
raise ValueError(f"Multiple ONNX files found ({joined}). Pass --onnx-path.")
return matches[0]
def resolve_onnx(
cfg: Config,
output_dir: str,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
last_training_job: str | None = None,
) -> ResolvedOnnx:
if onnx_path:
path = Path(onnx_path)
if path.exists():
return ResolvedOnnx(onnx_path=path, model_artifact=None, run_name=path.stem)
job = from_job or last_training_job
artifact = model_s3_uri
if not artifact:
if not job:
raise ValueError("No model source found. Pass --onnx-path, --model-s3-uri, --from-job, or run training first.")
artifact = sm_ops.get_model_artifacts(cfg.aws.region, cfg.aws.profile, job)
run_name = job or Path(artifact).name.removesuffix(".tar.gz").replace("/", "-")
root = Path(output_dir) / run_name / "source"
tar_path = root / "model.tar.gz"
s3_ops.download_file(cfg.aws.region, cfg.aws.profile, artifact, str(tar_path))
extract_dir = root / "extracted"
extract_dir.mkdir(parents=True, exist_ok=True)
try:
with tarfile.open(tar_path, "r:gz") as tar:
_safe_extract(tar, extract_dir)
except tarfile.TarError as e:
raise ValueError(f"Invalid model tarball: {tar_path}") from e
return ResolvedOnnx(
onnx_path=_find_onnx(extract_dir, onnx_path),
model_artifact=artifact,
run_name=run_name,
)

View File

@@ -1,30 +1,85 @@
import json import json
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
STATE_FILE = ".qc-cli.json" STATE_FILE = ".qc-cli.json"
def _path(config_dir: str) -> Path: @dataclass(frozen=True)
return Path(config_dir) / STATE_FILE class CliStateStore:
config_dir: str = "."
@property
def path(self) -> Path:
return Path(self.config_dir) / STATE_FILE
def read_state(config_dir: str = ".") -> dict[str, Any]: def read(self) -> dict[str, Any]:
path = _path(config_dir) if not self.path.exists():
if not path.exists():
return {} return {}
with open(path) as f: with open(self.path) as f:
return json.load(f) value = json.load(f)
return dict(value) if isinstance(value, dict) else {}
def update(self, **updates: Any) -> None:
def write_state(config_dir: str = ".", **updates: str | None) -> None: state = self.read()
path = _path(config_dir)
state = read_state(config_dir)
state.update(updates) state.update(updates)
with open(path, "w") as f: self._write(state)
def get(self, key: str, default: Any = None) -> Any:
return self.read().get(key, default)
def get_last_training_job(self) -> str | None:
value = self.get("last_training_job")
return str(value) if value else None
def get_last_model_artifact(self) -> str | None:
value = self.get("last_model_artifact")
return str(value) if value else None
def get_last_optimized_model_id(self) -> str | None:
value = self.get("last_optimized_model_id")
return str(value) if value else None
def get_last_quantized_model_id(self) -> str | None:
value = self.get("last_quantized_model_id")
return str(value) if value else None
def get_last_compiled_model_id(self) -> str | None:
value = self.get("last_compiled_model_id")
return str(value) if value else None
def get_last_downloaded_model(self) -> str | None:
value = self.get("last_downloaded_model")
return str(value) if value else None
def set_last_training_job(self, job_name: str) -> None:
self.update(last_training_job=job_name)
def get_training_job(self, job_name: str) -> dict[str, Any]:
jobs = self._training_jobs(self.read())
value = jobs.get(job_name, {})
return dict(value) if isinstance(value, dict) else {}
def update_training_job(self, job_name: str, **updates: Any) -> None:
state = self.read()
jobs = self._training_jobs(state)
jobs[job_name] = {**jobs.get(job_name, {}), **updates}
state["training_jobs"] = jobs
self._write(state)
def set_latest_experiment_model_version(self, version: str) -> None:
self.update(latest_experiment_model_version=version)
def _write(self, state: dict[str, Any]) -> None:
with open(self.path, "w") as f:
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
def _training_jobs(self, state: dict[str, Any]) -> dict[str, Any]:
value = state.get("training_jobs", {})
return dict(value) if isinstance(value, dict) else {}
def get_last_training_job(config_dir: str = ".") -> str | None:
value = read_state(config_dir).get("last_training_job") def store(config_path: str) -> CliStateStore:
return str(value) if value else None config_dir = str(Path(config_path).parent)
return CliStateStore(config_dir)

3
src/tracking/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from src.tracking.mlflow import MlflowTracker, NoopTracker, Tracker
__all__ = ["MlflowTracker", "NoopTracker", "Tracker"]

153
src/tracking/mlflow.py Normal file
View File

@@ -0,0 +1,153 @@
import os
from dataclasses import dataclass
from typing import Any, Protocol
import mlflow
from mlflow.tracking import MlflowClient
from src.aws import mlflow as aws_mlflow
from src.config import Config, MlflowMode
class Tracker(Protocol):
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None: ...
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: ...
@dataclass(frozen=True)
class NoopTracker:
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
return None
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
return None
@dataclass(frozen=True)
class MlflowTracker:
tracking_uri: str
experiment_name: str
registered_model_name: str
register_trained_models: bool
@classmethod
def from_config(cls, cfg: Config) -> Tracker:
if cfg.mlflow.mode is MlflowMode.disabled:
return NoopTracker()
os.environ.setdefault("MLFLOW_SUPPRESS_PRINTING_URL_TO_STDOUT", "true")
tracking_server_name = cfg.effective_mlflow_tracking_server_name
if not tracking_server_name:
raise RuntimeError("MLflow tracking server name could not be resolved.")
tracking_uri = aws_mlflow.get_tracking_server_arn(
cfg.aws.region,
cfg.aws.profile,
tracking_server_name,
)
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(cfg.mlflow.experiment_name)
return cls(
tracking_uri=tracking_uri,
experiment_name=cfg.mlflow.experiment_name,
registered_model_name=cfg.mlflow.registered_model_name,
register_trained_models=cfg.mlflow.register_trained_models,
)
def start_training_run(self, training_job: Any, *, region: str, profile: str, role_arn: str) -> str | None:
run = mlflow.start_run(run_name=training_job.job_name)
run_id = str(run.info.run_id)
params = {
"aws.region": region,
"aws.profile": profile,
"sagemaker.role_arn": role_arn,
"sagemaker.job_name": training_job.job_name,
"sagemaker.training_image": training_job.image_uri,
"sagemaker.instance_type": training_job.instance_type,
"sagemaker.instance_count": training_job.instance_count,
"sagemaker.s3_train_uri": training_job.s3_train_uri,
"sagemaker.s3_output_path": training_job.s3_output_path,
"sagemaker.entry_point": training_job.entry_point,
"sagemaker.source_dir": training_job.source_dir,
}
self._log_params(params)
self._log_params({f"hyperparameters.{key}": value for key, value in training_job.hyperparameters.items()})
mlflow.set_tags(
{
"qc_cli.stage": "experiment",
"qc_cli.artifact_kind": "trained_source",
"qc_cli.source": "sagemaker",
"qc_cli.command": "train start",
"sagemaker.job_name": training_job.job_name,
}
)
mlflow.end_run()
return run_id
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
if not run_id:
return None
with mlflow.start_run(run_id=run_id):
self._log_params(
{
"sagemaker.training_status": training_job_status.status,
"sagemaker.created_at": training_job_status.created,
"sagemaker.modified_at": training_job_status.modified,
"sagemaker.model_artifacts": training_job_status.model_artifacts,
"sagemaker.failure_reason": training_job_status.failure_reason,
}
)
self._log_final_metrics(training_job_status.raw)
mlflow.set_tag("qc_cli.command", "train status")
if training_job_status.status != "Completed" or not training_job_status.model_artifacts:
mlflow.set_tag("qc_cli.training_terminal_status", training_job_status.status)
return None
if not self.register_trained_models:
return None
client = MlflowClient()
self._ensure_registered_model(client, self.registered_model_name)
version = client.create_model_version(
name=self.registered_model_name,
source=training_job_status.model_artifacts,
run_id=run_id,
tags={
"qc_cli.stage": "experiment",
"qc_cli.artifact_kind": "trained_source",
"qc_cli.source": "sagemaker",
"sagemaker.job_name": training_job_status.name,
},
)
version_number = str(version.version)
client.set_registered_model_alias(self.registered_model_name, "experiment-latest", version_number)
mlflow.set_tag("qc_cli.registered_model_name", self.registered_model_name)
mlflow.set_tag("qc_cli.registered_model_version", version_number)
return version_number
def _log_params(self, params: dict[str, Any]) -> None:
cleaned = {key: str(value) for key, value in params.items() if value is not None}
if cleaned:
mlflow.log_params(cleaned)
def _log_final_metrics(self, training_job: dict[str, Any]) -> None:
metrics = {}
for metric in training_job.get("FinalMetricDataList", []):
name = metric.get("MetricName")
value = metric.get("Value")
if name and value is not None:
metrics[str(name)] = float(value)
if metrics:
mlflow.log_metrics(metrics)
def _ensure_registered_model(self, client: MlflowClient, name: str) -> None:
try:
client.get_registered_model(name)
except Exception:
client.create_registered_model(name)

1970
uv.lock generated

File diff suppressed because it is too large Load Diff