4 Commits

Author SHA1 Message Date
f1f5dcbed7 update 2026-06-09 10:01:09 -04:00
75f66f81c1 initial version to train yolo model 2026-06-09 09:15:35 -04:00
samirodr
5360a482fc update 2026-06-08 14:59:44 -04:00
samirodr
6a560a8610 match 2026-06-08 14:54:13 -04:00
13 changed files with 359 additions and 712 deletions

View File

@@ -199,8 +199,6 @@ When a step runs in the current command, `upload` passes its returned model ID d
`ai-hub compile` resolves model sources in this order: `--model-id`, explicit source options (`--onnx-path`, `--model-s3-uri`, `--from-job`), last quantized model from state, then the last training job from local state. `ai-hub download` is separate because downloading the optimized artifact is outside the four-step Workbench upload loop. `ai-hub compile` resolves model sources in this order: `--model-id`, explicit source options (`--onnx-path`, `--model-s3-uri`, `--from-job`), last quantized model from state, then the last training job from local state. `ai-hub download` is separate because downloading the optimized artifact is outside the four-step Workbench upload loop.
When MLflow is enabled, AI Hub job-producing commands (`quantize`, `compile`, `validate`, `profile`, and `upload`) log AI Hub metadata to MLflow. Each command execution receives a `qc_cli.aihub_submission_id`; all steps inside one `ai-hub upload` share that submission ID. Runs are nested under the MLflow run for the resolved source model when the CLI can prove that source from local state, such as `--from-job` or a model produced by a prior tracked AI Hub step. Otherwise, AI Hub runs are standalone. `validate` also logs output summaries, and `profile` logs profile metrics plus the raw profile JSON. `ai-hub download` does not create an MLflow run because it does not submit or measure an AI Hub job.
AI Hub authentication currently uses the local `qai-hub` SDK configuration. A planned follow-up is to support AWS Systems Manager Parameter Store `SecureString` for team-managed tokens, where `config.yaml` stores only a parameter name such as `/qc-cli/aihub/token`, AWS KMS encrypts the token at rest, and the CLI retrieves it at runtime with `ssm:GetParameter` plus `kms:Decrypt` permissions. AI Hub authentication currently uses the local `qai-hub` SDK configuration. A planned follow-up is to support AWS Systems Manager Parameter Store `SecureString` for team-managed tokens, where `config.yaml` stores only a parameter name such as `/qc-cli/aihub/token`, AWS KMS encrypts the token at rest, and the CLI retrieves it at runtime with `ssm:GetParameter` plus `kms:Decrypt` permissions.
## Model lifecycle ## Model lifecycle

View File

@@ -13,18 +13,10 @@ This example takes the ONNX model produced by the SageMaker training example and
Run the training example first and wait for it to complete: Run the training example first and wait for it to complete:
```bash ```bash
bash examples/training/run_training.sh --config config.yaml --wait examples/training/run_training.sh --wait
``` ```
If the dataset is already uploaded to S3, use: The `config.yaml` file must include AI Hub settings:
```bash
bash examples/training/run_training.sh --config config.yaml --skip-upload --wait
```
The training artifact must contain a static-shape `model.onnx`. The training example exports an input named `input` with shape `1x3x160x160`.
Your `config.yaml` must include AI Hub settings:
```yaml ```yaml
aihub: aihub:
@@ -36,16 +28,20 @@ aihub:
output_dir: build/qai-hub output_dir: build/qai-hub
``` ```
You also need local Qualcomm AI Hub SDK authentication configured. Finally, the user needs to authenticate with Qualcomm AI Hub using:
```bash
qai-hub configure --api_token
```
## Prepare Inputs ## Prepare Inputs
AI Hub does not consume the raw JPG training images directly. It needs NumPy tensors that match the ONNX model input shape and preprocessing. AI Hub does not consume the raw JPG training images directly. It needs NumPy tensors that match the ONNX model input shape and preprocessing.
Generate calibration and validation inputs: To generate calibration and validation inputs:
```bash ```bash
uv run python examples/ai-hub/prepare_inputs.py python examples/ai-hub/prepare_inputs.py
``` ```
This writes: This writes:
@@ -61,58 +57,23 @@ The script applies the same image preprocessing used by the training example:
- convert to channel-first `1x3x160x160` - convert to channel-first `1x3x160x160`
- normalize with ImageNet mean and standard deviation - normalize with ImageNet mean and standard deviation
Useful options: ## Upload Model to Qualcomm Workbench
The model can be uploaded to Qualcomm Workbench using:
```bash ```bash
uv run python examples/ai-hub/prepare_inputs.py \ qc-cli ai-hub upload examples/training/data/aihub_calibration examples/training/data/inputs.npz
--dataset-dir examples/training/data/flower_photos_sagemaker \
--calibration-dir examples/training/data/aihub_calibration \
--input-file examples/training/data/inputs.npz \
--samples 16
``` ```
## Run AI Hub The first argument is the calibration path for the model and the second argument is the input file, both of which were created by the `prepare_inputs.py` script. For more details, add `--help` after the `upload` command.
After training completes and inputs are prepared: The `upload` command runs the following commands in order:
1. `qc-cli ai-hub quantize`
2. `qc-cli ai-hub compile`
3. `qc-cli ai-hub validate`
4. `qc-cli ai-hub profile`
Finally the user can download the model from AI Workbench using the command
```bash ```bash
bash examples/ai-hub/run_ai_hub.sh --config config.yaml qc-cli ai-hub download
``` ```
By default, the script uses the last SageMaker training job recorded in `.qc-cli.json`. It downloads that job's `model.tar.gz`, extracts `model.onnx`, runs the AI Hub workflow, and downloads the compiled artifact.
To use a specific training job:
```bash
bash examples/ai-hub/run_ai_hub.sh \
--config config.yaml \
--from-job qc-cli-YYYYMMDD-HHMMSS
```
To resume from a later Workbench step:
```bash
bash examples/ai-hub/run_ai_hub.sh \
--config config.yaml \
--from-step validate
```
To skip downloading the compiled artifact:
```bash
bash examples/ai-hub/run_ai_hub.sh \
--config config.yaml \
--skip-download
```
## Troubleshooting
If AI Hub reports dynamic input shapes, rerun training with the current training source. AI Hub quantization requires the exported ONNX model to use static input shapes.
If `run_ai_hub.sh` reports missing calibration or input files, run:
```bash
uv run python examples/ai-hub/prepare_inputs.py
```
If validation fails with a missing input name, make sure `config.yaml` and the generated `.npz` both use `input` as the input name.

0
examples/ai-hub/prepare_inputs.py Executable file → Normal file
View File

View File

@@ -1,156 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
CONFIG_PATH="config.yaml"
CALIBRATION_PATH="examples/training/data/aihub_calibration"
INPUT_FILE="examples/training/data/inputs.npz"
FROM_STEP="quantize"
FROM_JOB=""
MODEL_S3_URI=""
ONNX_PATH=""
INPUT_NAME=""
DOWNLOAD=true
OUTPUT_PATH=""
usage() {
cat <<EOF
Usage: $0 [options]
Options:
--config PATH Path to qc-cli config file. Default: config.yaml
--calibration PATH Calibration .npz file or directory of .npy samples.
Default: ${CALIBRATION_PATH}
--input-file PATH Validation .npz or .npy inputs. Default: ${INPUT_FILE}
--from-step STEP Resume upload from: quantize, compile, validate, profile.
Default: ${FROM_STEP}
--from-job NAME SageMaker training job whose model artifact should upload.
Defaults to the last training job in local qc-cli state.
--model-s3-uri URI S3 URI of model.tar.gz to upload.
--onnx-path PATH Local ONNX path or ONNX path inside extracted artifact.
--input-name NAME Input name for .npy validation files.
--skip-download Do not download the compiled AI Hub artifact after upload.
--output PATH Destination file for ai-hub download.
-h, --help Show this help.
EOF
}
while [[ $# -gt 0 ]]; do
case "$1" in
--config)
CONFIG_PATH="$2"
shift 2
;;
--calibration)
CALIBRATION_PATH="$2"
shift 2
;;
--input-file)
INPUT_FILE="$2"
shift 2
;;
--from-step)
FROM_STEP="$2"
shift 2
;;
--from-job)
FROM_JOB="$2"
shift 2
;;
--model-s3-uri)
MODEL_S3_URI="$2"
shift 2
;;
--onnx-path)
ONNX_PATH="$2"
shift 2
;;
--input-name)
INPUT_NAME="$2"
shift 2
;;
--skip-download)
DOWNLOAD=false
shift
;;
--output)
OUTPUT_PATH="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage >&2
exit 1
;;
esac
done
if [[ ! -f "${CONFIG_PATH}" ]]; then
echo "Config not found: ${CONFIG_PATH}" >&2
exit 1
fi
case "${FROM_STEP}" in
quantize|compile|validate|profile)
;;
*)
echo "--from-step must be one of: quantize, compile, validate, profile" >&2
exit 1
;;
esac
if [[ ! -e "${CALIBRATION_PATH}" ]]; then
echo "Calibration path not found: ${CALIBRATION_PATH}" >&2
echo "Pass --calibration with a .npz file or directory of .npy samples." >&2
exit 1
fi
if [[ ! -f "${INPUT_FILE}" ]]; then
echo "Input file not found: ${INPUT_FILE}" >&2
echo "Pass --input-file with a validation .npz or .npy file." >&2
exit 1
fi
run() {
echo "+ $*"
"$@"
}
UPLOAD_ARGS=(
"${CALIBRATION_PATH}"
"${INPUT_FILE}"
--from-step "${FROM_STEP}"
--config "${CONFIG_PATH}"
)
if [[ -n "${FROM_JOB}" ]]; then
UPLOAD_ARGS+=(--from-job "${FROM_JOB}")
fi
if [[ -n "${MODEL_S3_URI}" ]]; then
UPLOAD_ARGS+=(--model-s3-uri "${MODEL_S3_URI}")
fi
if [[ -n "${ONNX_PATH}" ]]; then
UPLOAD_ARGS+=(--onnx-path "${ONNX_PATH}")
fi
if [[ -n "${INPUT_NAME}" ]]; then
UPLOAD_ARGS+=(--input-name "${INPUT_NAME}")
fi
run uv run qc-cli ai-hub upload "${UPLOAD_ARGS[@]}"
if [[ "${DOWNLOAD}" == false ]]; then
exit 0
fi
DOWNLOAD_ARGS=(--config "${CONFIG_PATH}")
if [[ -n "${OUTPUT_PATH}" ]]; then
DOWNLOAD_ARGS+=(--output "${OUTPUT_PATH}")
fi
run uv run qc-cli ai-hub download "${DOWNLOAD_ARGS[@]}"

View File

@@ -0,0 +1,186 @@
# YOLO26 Electric Meter Detection Example
This example trains a YOLO26 object detection model on the Roboflow Universe electric meter dataset using the existing `qc-cli` SageMaker training flow.
The workflow is intentionally command driven. Run each step yourself so you can inspect the dataset, update `config.yaml`, and decide when to submit the SageMaker job.
Dataset:
```text
https://universe.roboflow.com/kemals-workspace-kbc8l/electric-meter-detection-o4tfi/dataset/1
```
## Prerequisites
- Install or sync the project dependencies: `uv sync`
- The virtual environment is activated.
- AWS credentials configured for the profile in `config.yaml`
- Infrastructure already deployed with `qc-cli infra setup`
## 1. Download The Dataset
Register or sign in to Roboflow, then open the dataset page:
```text
https://universe.roboflow.com/kemals-workspace-kbc8l/electric-meter-detection-o4tfi/dataset/1
```
Download the dataset in YOLOv26 format from the Roboflow UI, then extract the downloaded archive into:
```text
examples/meter-detection/data/electric-meter-detection
```
The `data.yaml` file should be directly under that folder:
```text
examples/meter-detection/data/electric-meter-detection/data.yaml
```
Do not move `data.yaml` into the `train/` split folder.
After extracting, confirm the dataset has a YOLO data file and image splits:
```bash
find examples/meter-detection/data/electric-meter-detection -maxdepth 2 -type d | sort
find examples/meter-detection/data/electric-meter-detection -name data.yaml -print
```
Open `examples/meter-detection/data/electric-meter-detection/data.yaml` and make sure the split paths are relative to that folder:
```yaml
path: .
train: train/images
val: valid/images
test: test/images
```
If your downloaded dataset does not include a `test/` folder, remove the `test:` line.
The expected layout is similar to:
```text
examples/meter-detection/data/electric-meter-detection/
data.yaml
train/
valid/
test/
```
## 2. Configure SageMaker Training
Update `config.yaml` so the training section points at this example's source directory:
```yaml
sagemaker:
training:
image_uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6-cpu-py312-ubuntu22.04-sagemaker-v1
instance_type: ml.g4dn.xlarge
instance_count: 1
source_dir: examples/meter-detection/source
entry_point: train.py
hyperparameters:
model: yolo26n.pt
epochs: 25
imgsz: 640
batch: 16
workers: 2
```
Use `yolo26n.pt` for a lightweight first YOLO26 run. If those weights are unavailable in the installed Ultralytics package, use `yolo11n.pt` as the established fallback:
```yaml
model: yolo11n.pt
```
The `source/requirements.txt` file is installed by the SageMaker PyTorch container before running `train.py`.
For a CPU smoke test, use a CPU instance and reduce the workload:
```yaml
sagemaker:
training:
image_uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.6-cpu-py312-ubuntu22.04-sagemaker-v1
instance_type: ml.m4.xlarge
instance_count: 1
source_dir: examples/meter-detection/source
entry_point: train.py
hyperparameters:
model: yolo26n.pt
epochs: 1
imgsz: 320
batch: 4
workers: 2
```
## 3. Check Infrastructure
Confirm the CLI can see the configured SageMaker role and S3 bucket:
```bash
qc-cli infra status --config config.yaml
```
## 4. Upload The Dataset
Upload the downloaded Roboflow dataset to the `s3.data_prefix` configured in `config.yaml`:
```bash
qc-cli upload examples/meter-detection/data/electric-meter-detection
```
Directory uploads preserve paths relative to the uploaded directory, so SageMaker receives the dataset root with `data.yaml` plus the split directories.
In SageMaker, this uploaded dataset root is mounted at `/opt/ml/input/data/train`. That `train` path is the SageMaker channel name, not the YOLO `train/` split folder.
## 5. Start Training
Submit the SageMaker training job:
```bash
qc-cli train start
```
The command prints the submitted SageMaker job name. Check progress with:
```bash
qc-cli train status
```
Or pass the job name explicitly:
```bash
qc-cli train status qc-cli-YYYYMMDD-HHMMSS
```
## Outputs
When the job completes, SageMaker packages the files written under `/opt/ml/model` into `model.tar.gz`.
This example writes:
```text
best.pt
model.onnx
metrics.json
```
The archive is stored under the configured `s3.model_prefix`.
## Training Hyperparameters
Values under `sagemaker.training.hyperparameters` are passed to `source/train.py` as command-line arguments.
| Name | Type | Default | Description |
|---|---:|---:|---|
| `model` | string | `yolo26n.pt` | Ultralytics model weights or model YAML. |
| `epochs` | int | `25` | Number of training epochs. |
| `imgsz` | int | `640` | Square training image size. |
| `batch` | int | `16` | Images per training batch. |
| `workers` | int | `2` | DataLoader worker count. |
| `patience` | int | `20` | Early stopping patience. |
| `device` | string | auto | Optional Ultralytics device value such as `0` or `cpu`. |
| `data-yaml` | string | auto | Optional path to `data.yaml`; normally discovered from the uploaded dataset root. |
| `dataset-dir` | string | `SM_CHANNEL_TRAIN` | Uploaded dataset root mounted by SageMaker. |
Do not set `dataset-dir` or `model-dir` in normal SageMaker runs. SageMaker sets those automatically through `SM_CHANNEL_TRAIN` and `SM_MODEL_DIR`.

View File

@@ -0,0 +1,3 @@
ultralytics>=8.3.0
pyyaml>=6.0.3
onnx>=1.16.0

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""SageMaker entry point for YOLO electric meter detection training."""
from __future__ import annotations
import argparse
import json
import os
import shutil
from pathlib import Path
from typing import Any
import yaml
from ultralytics import YOLO # type: ignore[reportMissingImports]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="yolo26n.pt")
parser.add_argument("--epochs", type=int, default=25)
parser.add_argument("--imgsz", type=int, default=640)
parser.add_argument("--batch", type=int, default=16)
parser.add_argument("--workers", type=int, default=2)
parser.add_argument("--patience", type=int, default=20)
parser.add_argument("--device", default=None)
parser.add_argument("--data-yaml", default=None)
parser.add_argument("--dataset-dir", default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
parser.add_argument("--train-dir", dest="dataset_dir", help=argparse.SUPPRESS)
parser.add_argument("--model-dir", default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
return parser.parse_args()
def find_data_yaml(dataset_dir: Path, explicit_path: str | None) -> Path:
if explicit_path:
data_yaml = Path(explicit_path)
if data_yaml.is_file():
return data_yaml
raise FileNotFoundError(f"Configured data.yaml does not exist: {data_yaml}")
matches = sorted(dataset_dir.rglob("data.yaml"))
if not matches:
raise FileNotFoundError(f"Could not find data.yaml under {dataset_dir}")
if len(matches) > 1:
print(f"Found multiple data.yaml files; using {matches[0]}")
return matches[0]
def prepare_data_yaml(data_yaml: Path) -> Path:
"""Write a SageMaker-local data file rooted at the uploaded dataset."""
dataset_root = data_yaml.parent
data = yaml.safe_load(data_yaml.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"Expected a mapping in {data_yaml}")
normalized = dict(data)
normalized["path"] = str(dataset_root)
if "val" not in normalized and "valid" in normalized:
normalized["val"] = normalized.pop("valid")
prepared_path = dataset_root / "data.sagemaker.yaml"
prepared_path.write_text(yaml.safe_dump(normalized, sort_keys=False), encoding="utf-8")
print(f"Prepared dataset config: {prepared_path}")
return prepared_path
def copy_if_exists(source: Path, destination: Path) -> None:
if source.exists():
shutil.copy2(source, destination)
print(f"Saved {destination}")
def main() -> None:
args = parse_args()
dataset_dir = Path(args.dataset_dir)
model_dir = Path(args.model_dir)
model_dir.mkdir(parents=True, exist_ok=True)
data_yaml = prepare_data_yaml(find_data_yaml(dataset_dir, args.data_yaml))
model = YOLO(args.model)
train_kwargs: dict[str, Any] = {
"data": str(data_yaml),
"epochs": args.epochs,
"imgsz": args.imgsz,
"batch": args.batch,
"workers": args.workers,
"patience": args.patience,
"project": str(model_dir / "runs"),
"name": "train",
"exist_ok": True,
}
if args.device:
train_kwargs["device"] = args.device
results = model.train(**train_kwargs)
save_dir = Path(results.save_dir)
best_pt = save_dir / "weights" / "best.pt"
last_pt = save_dir / "weights" / "last.pt"
trained_weights = best_pt if best_pt.exists() else last_pt
if not trained_weights.exists():
raise FileNotFoundError(f"Could not find trained weights in {save_dir / 'weights'}")
copy_if_exists(trained_weights, model_dir / "best.pt")
trained_model = YOLO(str(trained_weights))
onnx_path = Path(trained_model.export(format="onnx", imgsz=args.imgsz))
copy_if_exists(onnx_path, model_dir / "model.onnx")
metrics = {
"model": args.model,
"epochs": args.epochs,
"imgsz": args.imgsz,
"batch": args.batch,
"workers": args.workers,
"patience": args.patience,
"data_yaml": str(data_yaml),
"weights": str(trained_weights),
"onnx": str(onnx_path),
}
(model_dir / "metrics.json").write_text(json.dumps(metrics, indent=2), encoding="utf-8")
print(f"Saved model artifacts to {model_dir}")
if __name__ == "__main__":
main()

0
src/cloud/__init__.py Normal file
View File

View File

@@ -1,11 +1,8 @@
import json
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from dataclasses import asdict, dataclass
from datetime import datetime from datetime import datetime
from enum import StrEnum from enum import StrEnum
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from uuid import uuid4
import qai_hub.hub as hub import qai_hub.hub as hub
import typer import typer
@@ -16,9 +13,8 @@ from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
from src.config import Config from src.config import Config
from src.qualcomm import aihub_jobs from src.qualcomm import aihub_jobs
from src.qualcomm.artifacts import resolve_onnx from src.qualcomm.artifacts import resolve_onnx
from src.tracking.mlflow import AIHubSourceProvenance, AIHubStepRecord, MlflowTracker, Tracker
app = typer.Typer(help="Quantize, compile, validate, profile, and download models with Qualcomm AI Hub") app = typer.Typer(help="Quantize, compile, validate, profile, and download models with Qualcomm Workbench")
_RUNTIME_EXTENSIONS = { _RUNTIME_EXTENSIONS = {
"tflite": "tflite", "tflite": "tflite",
@@ -34,16 +30,6 @@ class UploadStep(StrEnum):
profile = "profile" profile = "profile"
@dataclass(frozen=True)
class AIHubStepResult:
job: Any
job_id: str
model_id: str | None = None
output_dir: Path | None = None
outputs: Mapping[str, Any] | None = None
profile: Mapping[str, Any] | None = None
def _input_specs(cfg: Config) -> dict[str, tuple[tuple[int, ...], str]]: def _input_specs(cfg: Config) -> dict[str, tuple[tuple[int, ...], str]]:
specs = {name: (tuple(shape), dtype) for name, (shape, dtype) in cfg.aihub.input_specs.items()} specs = {name: (tuple(shape), dtype) for name, (shape, dtype) in cfg.aihub.input_specs.items()}
if not specs: if not specs:
@@ -126,116 +112,6 @@ def _device_selector(device: Device) -> str:
return ", ".join(parts) if parts else "empty selector" return ", ".join(parts) if parts else "empty selector"
def _submission_id() -> str:
return f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid4().hex[:8]}"
def _tracker(cfg: Config) -> Tracker:
try:
return MlflowTracker.from_config(cfg)
except Exception as e:
CONSOLE.print(f"[red]MLflow setup failed: {e}[/red]")
raise typer.Exit(1)
def _training_parent_run_id(config_path: str, training_job: str | None) -> str | None:
if not training_job:
return None
run_id = state_ops.store(config_path).get_training_job(training_job).get("mlflow_run_id")
return str(run_id) if run_id else None
def _source_to_state(source: AIHubSourceProvenance) -> dict[str, Any]:
return {key: value for key, value in asdict(source).items() if value is not None}
def _source_from_state(value: Mapping[str, Any]) -> AIHubSourceProvenance:
return AIHubSourceProvenance(
kind=str(value.get("kind", "aihub_model")),
parent_run_id=str(value["parent_run_id"]) if value.get("parent_run_id") else None,
uri=str(value["uri"]) if value.get("uri") else None,
path=str(value["path"]) if value.get("path") else None,
aihub_model_id=str(value["aihub_model_id"]) if value.get("aihub_model_id") else None,
training_job=str(value["training_job"]) if value.get("training_job") else None,
)
def _source_for_aihub_model(config_path: str, model_id: str) -> AIHubSourceProvenance:
stored = state_ops.store(config_path).get_aihub_model_provenance(model_id)
if stored:
return _source_from_state(stored)
return AIHubSourceProvenance(kind="aihub_model", aihub_model_id=model_id)
def _source_for_resolved_onnx(
config_path: str,
*,
resolved_path: Path,
model_artifact: str | None,
from_job: str | None,
model_s3_uri: str | None,
onnx_path: str | None,
implicit_training_job: str | None,
implicit_model_artifact: str | None,
) -> AIHubSourceProvenance:
if onnx_path and Path(onnx_path).exists() and not from_job and not model_s3_uri:
return AIHubSourceProvenance(kind="local_onnx", path=str(resolved_path))
training_job = from_job
if not training_job and model_artifact and implicit_model_artifact and model_artifact == implicit_model_artifact:
training_job = implicit_training_job
if not training_job and not model_s3_uri and not onnx_path:
training_job = implicit_training_job
return AIHubSourceProvenance(
kind="sagemaker_model_artifact" if model_artifact else "local_onnx",
parent_run_id=_training_parent_run_id(config_path, training_job),
uri=model_artifact,
path=str(resolved_path) if not model_artifact else None,
training_job=training_job,
)
def _model_id_or_state_with_source(
config_path: str,
model_id: str | None,
*,
quantized: bool = False,
) -> tuple[str, AIHubSourceProvenance]:
resolved_model_id = _model_id_or_state(config_path, model_id, quantized=quantized)
return resolved_model_id, _source_for_aihub_model(config_path, resolved_model_id)
def _record_step(
cfg: Config,
tracker: Tracker,
*,
result: AIHubStepResult,
source: AIHubSourceProvenance,
step: str,
submission_id: str,
command: str,
options: str | None = None,
) -> None:
tracker.record_aihub_step(
AIHubStepRecord(
step=step,
submission_id=submission_id,
command=command,
source=source,
job=result.job,
job_id=result.job_id,
model_id=result.model_id,
target_runtime=cfg.aihub.target_runtime,
device=_device_selector(cfg.aihub.device),
options=options,
output_dir=result.output_dir,
outputs=result.outputs,
profile=result.profile,
)
)
def _validate_device(cfg: Config) -> None: def _validate_device(cfg: Config) -> None:
device = cfg.aihub.device device = cfg.aihub.device
try: try:
@@ -259,38 +135,23 @@ def _quantize_step(
from_job: str | None, from_job: str | None,
model_s3_uri: str | None, model_s3_uri: str | None,
onnx_path: str | None, onnx_path: str | None,
tracker: Tracker, ) -> str:
submission_id: str,
) -> AIHubStepResult:
st = state_ops.store(config_path) st = state_ops.store(config_path)
specs = _input_specs(cfg) specs = _input_specs(cfg)
implicit_training_job = st.get_last_training_job()
implicit_model_artifact = st.get_last_model_artifact()
try: try:
resolved = resolve_onnx( resolved = resolve_onnx(
cfg=cfg, cfg=cfg,
output_dir=cfg.aihub.output_dir, output_dir=cfg.aihub.output_dir,
from_job=from_job, from_job=from_job,
model_s3_uri=model_s3_uri or implicit_model_artifact, model_s3_uri=model_s3_uri or st.get_last_model_artifact(),
onnx_path=onnx_path, onnx_path=onnx_path,
last_training_job=implicit_training_job, last_training_job=st.get_last_training_job(),
) )
calibration_data = _load_calibration(calibration_path, specs) calibration_data = _load_calibration(calibration_path, specs)
except (FileNotFoundError, ValueError) as e: except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]") CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
source = _source_for_resolved_onnx(
config_path,
resolved_path=resolved.onnx_path,
model_artifact=resolved.model_artifact,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
implicit_training_job=implicit_training_job,
implicit_model_artifact=implicit_model_artifact,
)
try: try:
result = aihub_jobs.submit_quantize_job( result = aihub_jobs.submit_quantize_job(
resolved.onnx_path, resolved.onnx_path,
@@ -308,25 +169,9 @@ def _quantize_step(
last_quantize_job_id=result["job_id"], last_quantize_job_id=result["job_id"],
last_quantized_model_id=result["model_id"], last_quantized_model_id=result["model_id"],
) )
st.update_aihub_model_provenance(str(result["model_id"]), _source_to_state(source))
step_result = AIHubStepResult(
job=result["job"],
job_id=str(result["job_id"]),
model_id=str(result["model_id"]),
)
_record_step(
cfg,
tracker,
result=step_result,
source=source,
step="quantize",
submission_id=submission_id,
command="ai-hub quantize",
options=cfg.aihub.quantize_options,
)
CONSOLE.print(f"[green]✓[/green] Quantize job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Quantize job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Quantized model: [bold]{result['model_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Quantized model: [bold]{result['model_id']}[/bold]")
return step_result return str(result["model_id"])
def _compile_step( def _compile_step(
@@ -338,25 +183,19 @@ def _compile_step(
onnx_path: str | None, onnx_path: str | None,
*, *,
prefer_quantized: bool, prefer_quantized: bool,
tracker: Tracker, ) -> str:
submission_id: str,
) -> AIHubStepResult:
st = state_ops.store(config_path) st = state_ops.store(config_path)
_validate_device(cfg) _validate_device(cfg)
specs = _input_specs(cfg) specs = _input_specs(cfg)
model: Any model: Any
model_artifact: str | None = None model_artifact: str | None = None
source: AIHubSourceProvenance
has_explicit_source = bool(from_job or model_s3_uri or onnx_path) has_explicit_source = bool(from_job or model_s3_uri or onnx_path)
if model_id: if model_id:
model = model_id model = model_id
source = _source_for_aihub_model(config_path, model_id)
elif prefer_quantized and not has_explicit_source and st.get_last_quantized_model_id(): elif prefer_quantized and not has_explicit_source and st.get_last_quantized_model_id():
model = st.get_last_quantized_model_id() model = st.get_last_quantized_model_id()
source = _source_for_aihub_model(config_path, str(model))
else: else:
implicit_training_job = st.get_last_training_job()
try: try:
resolved = resolve_onnx( resolved = resolve_onnx(
cfg=cfg, cfg=cfg,
@@ -364,23 +203,13 @@ def _compile_step(
from_job=from_job, from_job=from_job,
model_s3_uri=model_s3_uri, model_s3_uri=model_s3_uri,
onnx_path=onnx_path, onnx_path=onnx_path,
last_training_job=implicit_training_job, last_training_job=st.get_last_training_job(),
) )
except (FileNotFoundError, ValueError) as e: except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]") CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
model = resolved.onnx_path model = resolved.onnx_path
model_artifact = resolved.model_artifact model_artifact = resolved.model_artifact
source = _source_for_resolved_onnx(
config_path,
resolved_path=resolved.onnx_path,
model_artifact=resolved.model_artifact,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
implicit_training_job=implicit_training_job,
implicit_model_artifact=st.get_last_model_artifact(),
)
try: try:
result = aihub_jobs.submit_compile_job( result = aihub_jobs.submit_compile_job(
@@ -403,25 +232,9 @@ def _compile_step(
if model_artifact: if model_artifact:
updates["last_model_artifact"] = model_artifact updates["last_model_artifact"] = model_artifact
st.update(**updates) st.update(**updates)
st.update_aihub_model_provenance(str(result["model_id"]), _source_to_state(source))
step_result = AIHubStepResult(
job=result["job"],
job_id=str(result["job_id"]),
model_id=str(result["model_id"]),
)
_record_step(
cfg,
tracker,
result=step_result,
source=source,
step="compile",
submission_id=submission_id,
command="ai-hub compile",
options=cfg.aihub.compile_options,
)
CONSOLE.print(f"[green]✓[/green] Compile job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Compile job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Compiled model: [bold]{result['model_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Compiled model: [bold]{result['model_id']}[/bold]")
return step_result return str(result["model_id"])
def _validate_step( def _validate_step(
@@ -430,12 +243,10 @@ def _validate_step(
input_file: Path, input_file: Path,
model_id: str | None, model_id: str | None,
input_name: str | None, input_name: str | None,
tracker: Tracker, ) -> str:
submission_id: str,
) -> AIHubStepResult:
_validate_device(cfg) _validate_device(cfg)
specs = _input_specs(cfg) specs = _input_specs(cfg)
resolved_model_id, source = _model_id_or_state_with_source(config_path, model_id) resolved_model_id = _model_id_or_state(config_path, model_id)
try: try:
inputs = _load_inputs(input_file, specs, input_name) inputs = _load_inputs(input_file, specs, input_name)
except (FileNotFoundError, ValueError) as e: except (FileNotFoundError, ValueError) as e:
@@ -457,40 +268,18 @@ def _validate_step(
raise typer.Exit(1) raise typer.Exit(1)
state_ops.store(config_path).update(last_inference_job_id=result["job_id"]) state_ops.store(config_path).update(last_inference_job_id=result["job_id"])
outputs = result.get("outputs")
step_result = AIHubStepResult(
job=result["job"],
job_id=str(result["job_id"]),
model_id=resolved_model_id,
output_dir=out_dir,
outputs=outputs if isinstance(outputs, Mapping) else None,
)
_record_step(
cfg,
tracker,
result=step_result,
source=source,
step="validate",
submission_id=submission_id,
command="ai-hub validate",
)
CONSOLE.print(f"[green]✓[/green] Inference job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Inference job: [bold]{result['job_id']}[/bold]")
outputs = result.get("outputs")
if isinstance(outputs, dict): if isinstance(outputs, dict):
for name, value in outputs.items(): for name, value in outputs.items():
CONSOLE.print(f" {name}: shape={getattr(value, 'shape', '?')}") CONSOLE.print(f" {name}: shape={getattr(value, 'shape', '?')}")
CONSOLE.print(f"Outputs: [cyan]{out_dir}[/cyan]") CONSOLE.print(f"Outputs: [cyan]{out_dir}[/cyan]")
return step_result return str(result["job_id"])
def _profile_step( def _profile_step(cfg: Config, config_path: str, model_id: str | None) -> str:
cfg: Config,
config_path: str,
model_id: str | None,
tracker: Tracker,
submission_id: str,
) -> AIHubStepResult:
_validate_device(cfg) _validate_device(cfg)
resolved_model_id, source = _model_id_or_state_with_source(config_path, model_id) resolved_model_id = _model_id_or_state(config_path, model_id)
try: try:
result = aihub_jobs.submit_profile_job( result = aihub_jobs.submit_profile_job(
resolved_model_id, resolved_model_id,
@@ -501,41 +290,9 @@ def _profile_step(
except Exception as e: except Exception as e:
CONSOLE.print(f"[red]AI Hub profile failed: {e}[/red]") CONSOLE.print(f"[red]AI Hub profile failed: {e}[/red]")
raise typer.Exit(1) raise typer.Exit(1)
run = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = Path(cfg.aihub.output_dir) / run / "profile"
try:
out_dir.mkdir(parents=True, exist_ok=True)
profile_data = result["job"].download_profile()
if isinstance(profile_data, Mapping):
(out_dir / "profile.json").write_text(json.dumps(profile_data, indent=2), encoding="utf-8")
else:
profile_data = {}
except Exception as e:
CONSOLE.print(f"[red]AI Hub profile download failed: {e}[/red]")
raise typer.Exit(1)
state_ops.store(config_path).update(last_profile_job_id=result["job_id"]) state_ops.store(config_path).update(last_profile_job_id=result["job_id"])
step_result = AIHubStepResult(
job=result["job"],
job_id=str(result["job_id"]),
model_id=resolved_model_id,
output_dir=out_dir,
profile=profile_data,
)
_record_step(
cfg,
tracker,
result=step_result,
source=source,
step="profile",
submission_id=submission_id,
command="ai-hub profile",
options=cfg.aihub.profile_options,
)
CONSOLE.print(f"[green]✓[/green] Profile job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Profile job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"Profile: [cyan]{out_dir}[/cyan]") return str(result["job_id"])
return step_result
@app.command() @app.command()
@@ -550,16 +307,7 @@ def quantize(
) -> None: ) -> None:
"""Quantize an ONNX model to INT8.""" """Quantize an ONNX model to INT8."""
cfg = load_cfg(config) cfg = load_cfg(config)
_quantize_step( _quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path)
cfg,
config,
calibration_path,
from_job,
model_s3_uri,
onnx_path,
_tracker(cfg),
_submission_id(),
)
@app.command() @app.command()
@@ -574,17 +322,7 @@ def compile(
) -> None: ) -> None:
"""Compile a model for the configured Qualcomm AI Hub target.""" """Compile a model for the configured Qualcomm AI Hub target."""
cfg = load_cfg(config) cfg = load_cfg(config)
_compile_step( _compile_step(cfg, config, model_id, from_job, model_s3_uri, onnx_path, prefer_quantized=True)
cfg,
config,
model_id,
from_job,
model_s3_uri,
onnx_path,
prefer_quantized=True,
tracker=_tracker(cfg),
submission_id=_submission_id(),
)
@app.command() @app.command()
@@ -596,7 +334,7 @@ def validate(
) -> None: ) -> None:
"""Run an AI Hub inference job using sample inputs.""" """Run an AI Hub inference job using sample inputs."""
cfg = load_cfg(config) cfg = load_cfg(config)
_validate_step(cfg, config, input_file, model_id, input_name, _tracker(cfg), _submission_id()) _validate_step(cfg, config, input_file, model_id, input_name)
@app.command() @app.command()
@@ -606,7 +344,7 @@ def profile(
) -> None: ) -> None:
"""Profile a compiled model on the configured AI Hub device.""" """Profile a compiled model on the configured AI Hub device."""
cfg = load_cfg(config) cfg = load_cfg(config)
_profile_step(cfg, config, model_id, _tracker(cfg), _submission_id()) _profile_step(cfg, config, model_id)
@app.command() @app.command()
@@ -626,25 +364,13 @@ def upload(
cfg = load_cfg(config) cfg = load_cfg(config)
steps = [UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile] steps = [UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile]
selected = steps[steps.index(from_step) :] selected = steps[steps.index(from_step) :]
tracker = _tracker(cfg)
submission_id = _submission_id()
quantized_model_id: str | None = None quantized_model_id: str | None = None
compiled_model_id: str | None = None compiled_model_id: str | None = None
if UploadStep.quantize in selected: if UploadStep.quantize in selected:
quantized = _quantize_step( quantized_model_id = _quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path)
cfg,
config,
calibration_path,
from_job,
model_s3_uri,
onnx_path,
tracker,
submission_id,
)
quantized_model_id = quantized.model_id
if UploadStep.compile in selected: if UploadStep.compile in selected:
compiled = _compile_step( compiled_model_id = _compile_step(
cfg, cfg,
config, config,
model_id=quantized_model_id, model_id=quantized_model_id,
@@ -652,14 +378,11 @@ def upload(
model_s3_uri=model_s3_uri, model_s3_uri=model_s3_uri,
onnx_path=onnx_path, onnx_path=onnx_path,
prefer_quantized=True, prefer_quantized=True,
tracker=tracker,
submission_id=submission_id,
) )
compiled_model_id = compiled.model_id
if UploadStep.validate in selected: if UploadStep.validate in selected:
_validate_step(cfg, config, input_file, compiled_model_id, input_name, tracker, submission_id) _validate_step(cfg, config, input_file, compiled_model_id, input_name)
if UploadStep.profile in selected: if UploadStep.profile in selected:
_profile_step(cfg, config, compiled_model_id, tracker, submission_id) _profile_step(cfg, config, compiled_model_id)
@app.command() @app.command()

View File

@@ -1 +0,0 @@

View File

@@ -67,18 +67,6 @@ class CliStateStore:
def set_latest_experiment_model_version(self, version: str) -> None: def set_latest_experiment_model_version(self, version: str) -> None:
self.update(latest_experiment_model_version=version) self.update(latest_experiment_model_version=version)
def get_aihub_model_provenance(self, model_id: str) -> dict[str, Any]:
provenance = self._aihub_model_provenance(self.read())
value = provenance.get(model_id, {})
return dict(value) if isinstance(value, dict) else {}
def update_aihub_model_provenance(self, model_id: str, provenance: dict[str, Any]) -> None:
state = self.read()
model_provenance = self._aihub_model_provenance(state)
model_provenance[model_id] = provenance
state["aihub_model_provenance"] = model_provenance
self._write(state)
def _write(self, state: dict[str, Any]) -> None: def _write(self, state: dict[str, Any]) -> None:
with open(self.path, "w") as f: with open(self.path, "w") as f:
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
@@ -87,10 +75,6 @@ class CliStateStore:
value = state.get("training_jobs", {}) value = state.get("training_jobs", {})
return dict(value) if isinstance(value, dict) else {} return dict(value) if isinstance(value, dict) else {}
def _aihub_model_provenance(self, state: dict[str, Any]) -> dict[str, Any]:
value = state.get("aihub_model_provenance", {})
return dict(value) if isinstance(value, dict) else {}
def store(config_path: str) -> CliStateStore: def store(config_path: str) -> CliStateStore:
config_dir = str(Path(config_path).parent) config_dir = str(Path(config_path).parent)

View File

@@ -1,3 +1,3 @@
from src.tracking.mlflow import AIHubSourceProvenance, AIHubStepRecord, MlflowTracker, NoopTracker, Tracker from src.tracking.mlflow import MlflowTracker, NoopTracker, Tracker
__all__ = ["AIHubSourceProvenance", "AIHubStepRecord", "MlflowTracker", "NoopTracker", "Tracker"] __all__ = ["MlflowTracker", "NoopTracker", "Tracker"]

View File

@@ -1,8 +1,5 @@
import os import os
import re
from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Any, Protocol from typing import Any, Protocol
import mlflow import mlflow
@@ -17,35 +14,6 @@ class Tracker(Protocol):
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: ... def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: ...
def record_aihub_step(self, record: "AIHubStepRecord") -> str | None: ...
@dataclass(frozen=True)
class AIHubSourceProvenance:
kind: str
parent_run_id: str | None = None
uri: str | None = None
path: str | None = None
aihub_model_id: str | None = None
training_job: str | None = None
@dataclass(frozen=True)
class AIHubStepRecord:
step: str
submission_id: str
command: str
source: AIHubSourceProvenance
job: Any | None = None
job_id: str | None = None
model_id: str | None = None
target_runtime: str | None = None
device: str | None = None
options: str | None = None
output_dir: str | Path | None = None
outputs: Mapping[str, Any] | None = None
profile: Mapping[str, Any] | None = None
@dataclass(frozen=True) @dataclass(frozen=True)
class NoopTracker: class NoopTracker:
@@ -55,9 +23,6 @@ class NoopTracker:
def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None: def finalize_training_run(self, *, run_id: str | None, training_job_status: Any) -> str | None:
return None return None
def record_aihub_step(self, record: AIHubStepRecord) -> str | None:
return None
@dataclass(frozen=True) @dataclass(frozen=True)
class MlflowTracker: class MlflowTracker:
@@ -166,21 +131,6 @@ class MlflowTracker:
mlflow.set_tag("qc_cli.registered_model_version", version_number) mlflow.set_tag("qc_cli.registered_model_version", version_number)
return version_number return version_number
def record_aihub_step(self, record: AIHubStepRecord) -> str | None:
run_name = f"ai-hub {record.step}"
if record.source.parent_run_id:
with mlflow.start_run(run_id=record.source.parent_run_id):
child = mlflow.start_run(run_name=run_name, nested=True)
try:
self._log_aihub_record(record)
return str(child.info.run_id)
finally:
mlflow.end_run()
with mlflow.start_run(run_name=run_name) as run:
self._log_aihub_record(record)
return str(run.info.run_id)
def _log_params(self, params: dict[str, Any]) -> None: def _log_params(self, params: dict[str, Any]) -> None:
cleaned = {key: str(value) for key, value in params.items() if value is not None} cleaned = {key: str(value) for key, value in params.items() if value is not None}
if cleaned: if cleaned:
@@ -201,128 +151,3 @@ class MlflowTracker:
client.get_registered_model(name) client.get_registered_model(name)
except Exception: except Exception:
client.create_registered_model(name) client.create_registered_model(name)
def _log_aihub_record(self, record: AIHubStepRecord) -> None:
status = self._job_status(record.job)
job_id = record.job_id or self._job_attr(record.job, "job_id")
self._log_params(
{
"aihub.step": record.step,
"aihub.submission_id": record.submission_id,
"aihub.job_id": job_id,
"aihub.job_name": self._job_attr(record.job, "name"),
"aihub.job_type": self._job_attr(record.job, "job_type"),
"aihub.job_url": self._job_attr(record.job, "url"),
"aihub.model_id": record.model_id,
"aihub.target_runtime": record.target_runtime,
"aihub.device": record.device,
"aihub.options": record.options or self._job_attr(record.job, "options"),
"aihub.status": status.get("code"),
"aihub.failure_reason": status.get("message"),
"aihub.output_dir": record.output_dir,
"qc_cli.source_model.kind": record.source.kind,
"qc_cli.source_model.uri": record.source.uri,
"qc_cli.source_model.path": record.source.path,
"qc_cli.source_model.aihub_model_id": record.source.aihub_model_id,
"qc_cli.source_training_job": record.source.training_job,
"qc_cli.parent_mlflow_run_id": record.source.parent_run_id,
}
)
mlflow.set_tags(
{
"qc_cli.source": "ai_hub",
"qc_cli.stage": record.step,
"qc_cli.command": record.command,
"qc_cli.aihub_submission_id": record.submission_id,
}
)
self._log_output_stats(record.outputs)
self._log_profile(record.profile)
if record.output_dir:
output_dir = Path(record.output_dir)
if output_dir.exists() and output_dir.is_dir():
mlflow.log_artifacts(str(output_dir), artifact_path=f"aihub/{record.step}")
def _log_output_stats(self, outputs: Mapping[str, Any] | None) -> None:
if not outputs:
return
import numpy as np
params: dict[str, Any] = {}
metrics: dict[str, float] = {}
for name, value in outputs.items():
safe_name = self._metric_name(name)
arr = np.asarray(value)
params[f"aihub.inference.output.{safe_name}.shape"] = list(arr.shape)
params[f"aihub.inference.output.{safe_name}.dtype"] = str(arr.dtype)
metrics[f"aihub.inference.output.{safe_name}.count"] = float(arr.size)
if arr.size == 0 or not np.issubdtype(arr.dtype, np.number):
continue
numeric = arr.astype(float, copy=False)
finite = numeric[np.isfinite(numeric)]
metrics[f"aihub.inference.output.{safe_name}.nan_count"] = float(np.isnan(numeric).sum())
metrics[f"aihub.inference.output.{safe_name}.inf_count"] = float(np.isinf(numeric).sum())
if finite.size == 0:
continue
metrics[f"aihub.inference.output.{safe_name}.min"] = float(finite.min())
metrics[f"aihub.inference.output.{safe_name}.max"] = float(finite.max())
metrics[f"aihub.inference.output.{safe_name}.mean"] = float(finite.mean())
metrics[f"aihub.inference.output.{safe_name}.std"] = float(finite.std())
metrics[f"aihub.inference.output.{safe_name}.l1_norm"] = float(np.linalg.norm(finite, ord=1))
metrics[f"aihub.inference.output.{safe_name}.l2_norm"] = float(np.linalg.norm(finite, ord=2))
self._log_params(params)
if metrics:
mlflow.log_metrics(metrics)
def _log_profile(self, profile: Mapping[str, Any] | None) -> None:
if not profile:
return
mlflow.log_dict(dict(profile), "aihub/profile.json")
metrics = {
f"aihub.profile.{self._metric_name(path)}": float(value)
for path, value in self._flatten_numeric(profile).items()
}
if metrics:
mlflow.log_metrics(metrics)
def _flatten_numeric(self, value: Any, prefix: str = "") -> dict[str, float]:
if isinstance(value, Mapping):
flattened: dict[str, float] = {}
for key, item in value.items():
child_prefix = f"{prefix}.{key}" if prefix else str(key)
flattened.update(self._flatten_numeric(item, child_prefix))
return flattened
if isinstance(value, list | tuple):
flattened = {}
for index, item in enumerate(value):
child_prefix = f"{prefix}.{index}" if prefix else str(index)
flattened.update(self._flatten_numeric(item, child_prefix))
return flattened
if isinstance(value, bool):
return {}
if isinstance(value, int | float):
return {prefix: float(value)}
return {}
def _job_status(self, job: Any | None) -> dict[str, Any]:
if job is None or not hasattr(job, "get_status"):
return {}
status = job.get_status()
return {
"code": getattr(status, "code", None),
"message": getattr(status, "message", None),
}
def _job_attr(self, job: Any | None, name: str) -> Any:
if job is None:
return None
try:
return getattr(job, name)
except Exception:
return None
def _metric_name(self, value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", str(value)).strip("._") or "value"