New example and updated ai-hun upload order (#4)

Co-authored-by: samirodr <sami.rodrigue@slalom.com>
Reviewed-on: #4
This commit was merged in pull request #4.
This commit is contained in:
2026-06-12 14:34:44 +00:00
parent 5360a482fc
commit 522ddc74e2
16 changed files with 777 additions and 677 deletions

View File

@@ -1,4 +1,5 @@
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
from pathlib import Path
@@ -12,9 +13,9 @@ from src import state as state_ops
from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg
from src.config import Config
from src.qualcomm import aihub_jobs
from src.qualcomm.artifacts import resolve_onnx
from src.qualcomm.artifacts import ResolvedOnnx, resolve_onnx
app = typer.Typer(help="Quantize, compile, validate, profile, and download models with Qualcomm Workbench")
app = typer.Typer(help="Optimize, quantize, compile, validate, profile, and download models with Qualcomm Workbench")
_RUNTIME_EXTENSIONS = {
"tflite": "tflite",
@@ -24,12 +25,19 @@ _RUNTIME_EXTENSIONS = {
class UploadStep(StrEnum):
optimize = "optimize"
quantize = "quantize"
compile = "compile"
validate = "validate"
profile = "profile"
@dataclass(frozen=True)
class ResolvedModelSource:
model: str | Path
model_artifact: str | None = None
def _input_specs(cfg: Config) -> dict[str, tuple[tuple[int, ...], str]]:
specs = {name: (tuple(shape), dtype) for name, (shape, dtype) in cfg.aihub.input_specs.items()}
if not specs:
@@ -101,6 +109,57 @@ def _model_id_or_state(config_path: str, model_id: str | None, *, quantized: boo
return resolved
def _resolve_model_source(
cfg: Config,
config_path: str,
*,
model_id: str | None = None,
previous_model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> ResolvedModelSource:
if model_id:
return ResolvedModelSource(model_id)
has_explicit_source = bool(from_job or model_s3_uri or onnx_path)
if previous_model_id and not has_explicit_source:
return ResolvedModelSource(previous_model_id)
resolved = _resolve_onnx_source(
cfg,
config_path,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
return ResolvedModelSource(resolved.onnx_path, resolved.model_artifact)
def _resolve_onnx_source(
cfg: Config,
config_path: str,
*,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> ResolvedOnnx:
st = state_ops.store(config_path)
last_training_job = st.get_last_training_job()
saved_model_artifact = None
if not from_job and not model_s3_uri and not onnx_path and not last_training_job:
saved_model_artifact = st.get_last_model_artifact()
return resolve_onnx(
cfg=cfg,
output_dir=cfg.aihub.output_dir,
from_job=from_job,
model_s3_uri=model_s3_uri or saved_model_artifact,
onnx_path=onnx_path,
last_training_job=last_training_job,
)
def _device_selector(device: Device) -> str:
parts: list[str] = []
if device.name:
@@ -132,20 +191,23 @@ def _quantize_step(
cfg: Config,
config_path: str,
calibration_path: Path,
from_job: str | None,
model_s3_uri: str | None,
onnx_path: str | None,
*,
model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> str:
st = state_ops.store(config_path)
specs = _input_specs(cfg)
try:
resolved = resolve_onnx(
cfg=cfg,
output_dir=cfg.aihub.output_dir,
source = _resolve_model_source(
cfg,
config_path,
model_id=model_id,
previous_model_id=st.get_last_optimized_model_id(),
from_job=from_job,
model_s3_uri=model_s3_uri or st.get_last_model_artifact(),
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
last_training_job=st.get_last_training_job(),
)
calibration_data = _load_calibration(calibration_path, specs)
except (FileNotFoundError, ValueError) as e:
@@ -153,73 +215,117 @@ def _quantize_step(
raise typer.Exit(1)
try:
hub_model = (
hub.upload_model(str(source.model), name=cfg.aihub.model_name)
if isinstance(source.model, Path)
else hub.get_model(source.model)
)
result = aihub_jobs.submit_quantize_job(
resolved.onnx_path,
hub_model,
calibration_data,
cfg.aihub.quantize_options,
job_name=_job_name(cfg, "quantize"),
model_name=cfg.aihub.model_name,
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub quantize failed: {e}[/red]")
raise typer.Exit(1)
st.update(
last_model_artifact=resolved.model_artifact,
last_quantize_job_id=result["job_id"],
last_quantized_model_id=result["model_id"],
)
updates: dict[str, Any] = {
"last_quantize_job_id": result["job_id"],
"last_quantized_model_id": result["model_id"],
}
if source.model_artifact:
updates["last_model_artifact"] = source.model_artifact
st.update(**updates)
CONSOLE.print(f"[green]✓[/green] Quantize job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Quantized model: [bold]{result['model_id']}[/bold]")
return str(result["model_id"])
def _optimize_step(
cfg: Config,
config_path: str,
from_job: str | None,
model_s3_uri: str | None,
onnx_path: str | None,
) -> str:
st = state_ops.store(config_path)
_validate_device(cfg)
specs = _input_specs(cfg)
try:
source = _resolve_onnx_source(
cfg,
config_path,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
try:
hub_model = hub.upload_model(str(source.onnx_path), name=cfg.aihub.model_name)
result = aihub_jobs.submit_compile_job(
model=hub_model,
device=cfg.aihub.device,
input_specs=specs,
target_runtime="onnx",
job_name=_job_name(cfg, "optimize"),
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub ONNX optimization failed: {e}[/red]")
raise typer.Exit(1)
st.update(
last_model_artifact=source.model_artifact,
last_optimize_job_id=result["job_id"],
last_optimized_model_id=result["model_id"],
)
CONSOLE.print(f"[green]✓[/green] ONNX optimization job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Optimized ONNX model: [bold]{result['model_id']}[/bold]")
return str(result["model_id"])
def _compile_step(
cfg: Config,
config_path: str,
model_id: str | None,
from_job: str | None,
model_s3_uri: str | None,
onnx_path: str | None,
*,
prefer_quantized: bool,
model_id: str | None = None,
from_job: str | None = None,
model_s3_uri: str | None = None,
onnx_path: str | None = None,
) -> str:
st = state_ops.store(config_path)
_validate_device(cfg)
specs = _input_specs(cfg)
model: Any
model_artifact: str | None = None
has_explicit_source = bool(from_job or model_s3_uri or onnx_path)
if model_id:
model = model_id
elif prefer_quantized and not has_explicit_source and st.get_last_quantized_model_id():
model = st.get_last_quantized_model_id()
else:
try:
resolved = resolve_onnx(
cfg=cfg,
output_dir=cfg.aihub.output_dir,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
last_training_job=st.get_last_training_job(),
)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
model = resolved.onnx_path
model_artifact = resolved.model_artifact
try:
source = _resolve_model_source(
cfg,
config_path,
model_id=model_id,
previous_model_id=st.get_last_quantized_model_id(),
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
except (FileNotFoundError, ValueError) as e:
CONSOLE.print(f"[red]{e}[/red]")
raise typer.Exit(1)
try:
hub_model = (
hub.upload_model(str(source.model), name=cfg.aihub.model_name)
if isinstance(source.model, Path)
else hub.get_model(source.model)
)
result = aihub_jobs.submit_compile_job(
model=model,
model=hub_model,
device=cfg.aihub.device,
input_specs=specs,
target_runtime=cfg.aihub.target_runtime,
options=cfg.aihub.compile_options,
job_name=_job_name(cfg, "compile"),
model_name=cfg.aihub.model_name if isinstance(model, Path) else None,
)
except Exception as e:
CONSOLE.print(f"[red]AI Hub compile failed: {e}[/red]")
@@ -229,8 +335,8 @@ def _compile_step(
"last_compile_job_id": result["job_id"],
"last_compiled_model_id": result["model_id"],
}
if model_artifact:
updates["last_model_artifact"] = model_artifact
if source.model_artifact:
updates["last_model_artifact"] = source.model_artifact
st.update(**updates)
CONSOLE.print(f"[green]✓[/green] Compile job: [bold]{result['job_id']}[/bold]")
CONSOLE.print(f"[green]✓[/green] Compiled model: [bold]{result['model_id']}[/bold]")
@@ -256,8 +362,9 @@ def _validate_step(
run = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = Path(cfg.aihub.output_dir) / run / "validation"
try:
hub_model = hub.get_model(resolved_model_id)
result = aihub_jobs.submit_inference_job(
resolved_model_id,
hub_model,
cfg.aihub.device,
inputs,
out_dir,
@@ -281,8 +388,9 @@ def _profile_step(cfg: Config, config_path: str, model_id: str | None) -> str:
_validate_device(cfg)
resolved_model_id = _model_id_or_state(config_path, model_id)
try:
hub_model = hub.get_model(resolved_model_id)
result = aihub_jobs.submit_profile_job(
resolved_model_id,
hub_model,
cfg.aihub.device,
cfg.aihub.profile_options,
job_name=_job_name(cfg, "profile"),
@@ -295,9 +403,24 @@ def _profile_step(cfg: Config, config_path: str, model_id: str | None) -> str:
return str(result["job_id"])
@app.command()
def optimize(
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should optimize"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to optimize"),
onnx_path: str | None = typer.Option(
None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact"
),
config: str = CONFIG_OPT,
) -> None:
"""Optimize an external model into a Workbench-produced ONNX model."""
cfg = load_cfg(config)
_optimize_step(cfg, config, from_job, model_s3_uri, onnx_path)
@app.command()
def quantize(
calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"),
model_id: str | None = typer.Option(None, "--model-id", help="AI Hub optimized ONNX model ID"),
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should quantize"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to quantize"),
onnx_path: str | None = typer.Option(
@@ -307,7 +430,15 @@ def quantize(
) -> None:
"""Quantize an ONNX model to INT8."""
cfg = load_cfg(config)
_quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path)
_quantize_step(
cfg,
config,
calibration_path,
model_id=model_id,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
@app.command()
@@ -322,7 +453,14 @@ def compile(
) -> None:
"""Compile a model for the configured Qualcomm AI Hub target."""
cfg = load_cfg(config)
_compile_step(cfg, config, model_id, from_job, model_s3_uri, onnx_path, prefer_quantized=True)
_compile_step(
cfg,
config,
model_id=model_id,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
)
@app.command()
@@ -351,7 +489,7 @@ def profile(
def upload(
calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"),
input_file: Path = typer.Argument(..., help="Validation .npz or .npy inputs to run on device"),
from_step: UploadStep = typer.Option(UploadStep.quantize, "--from-step", help="Resume from this Workbench step"),
from_step: UploadStep = typer.Option(UploadStep.optimize, "--from-step", help="Resume from this Workbench step"),
from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should upload"),
model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to upload"),
onnx_path: str | None = typer.Option(
@@ -360,25 +498,48 @@ def upload(
input_name: str | None = typer.Option(None, "--input-name", help="Input name for .npy validation files"),
config: str = CONFIG_OPT,
) -> None:
"""Run the four Workbench upload steps: quantize, compile, validate, and profile."""
"""Optimize, quantize, optionally compile, validate, and profile a model."""
cfg = load_cfg(config)
steps = [UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile]
steps = [UploadStep.optimize, UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile]
selected = steps[steps.index(from_step) :]
optimized_model_id: str | None = None
quantized_model_id: str | None = None
compiled_model_id: str | None = None
if UploadStep.optimize in selected:
optimized_model_id = _optimize_step(cfg, config, from_job, model_s3_uri, onnx_path)
if UploadStep.quantize in selected:
quantized_model_id = _quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path)
if UploadStep.compile in selected:
compiled_model_id = _compile_step(
if UploadStep.optimize not in selected:
optimized_model_id = state_ops.store(config).get_last_optimized_model_id()
if not optimized_model_id:
CONSOLE.print(
"[red]No optimized ONNX model found. Resume from --from-step optimize or run "
"'qc-cli ai-hub optimize' first.[/red]"
)
raise typer.Exit(1)
quantized_model_id = _quantize_step(
cfg,
config,
model_id=quantized_model_id,
from_job=from_job,
model_s3_uri=model_s3_uri,
onnx_path=onnx_path,
prefer_quantized=True,
calibration_path,
model_id=optimized_model_id,
)
if UploadStep.compile in selected:
if cfg.aihub.target_runtime == "onnx":
compiled_model_id = quantized_model_id or state_ops.store(config).get_last_quantized_model_id()
if not compiled_model_id:
CONSOLE.print(
"[red]No quantized ONNX model found. Resume from --from-step quantize or run "
"'qc-cli ai-hub quantize' first.[/red]"
)
raise typer.Exit(1)
state_ops.store(config).update(last_compiled_model_id=compiled_model_id)
CONSOLE.print("[green]✓[/green] Target runtime is ONNX; skipping final compile.")
else:
compiled_model_id = _compile_step(
cfg,
config,
model_id=quantized_model_id,
)
if UploadStep.validate in selected:
_validate_step(cfg, config, input_file, compiled_model_id, input_name)
if UploadStep.profile in selected:

View File

@@ -28,30 +28,19 @@ def _dataset_entries(inputs: dict[str, Any]) -> dict[str, list[Any]]:
def submit_compile_job(
model: Any,
model: Model,
device: Device,
input_specs: dict[str, tuple[tuple[int, ...], str]],
target_runtime: str,
options: str | None = None,
job_name: str | None = None,
model_name: str | None = None,
) -> ModelJobResult:
compile_options = f"--target_runtime {target_runtime}"
if options:
compile_options = f"{compile_options} {options}"
model_arg = model
if isinstance(model, Path):
model_arg = str(model)
elif isinstance(model, str):
candidate = Path(model)
model_arg = model if candidate.exists() or candidate.suffix else hub.get_model(model)
if model_name and isinstance(model_arg, str) and Path(model_arg).exists():
model_arg = hub.upload_model(model_arg, name=model_name)
job = hub.submit_compile_job(
model=model_arg,
model=model,
device=device,
name=job_name,
input_specs=input_specs,
@@ -64,14 +53,14 @@ def submit_compile_job(
def submit_inference_job(
model_id: str,
model: Model,
device: Device,
inputs: dict[str, Any],
output_dir: str | Path,
job_name: str | None = None,
) -> InferenceJobResult:
job = hub.submit_inference_job(
model=hub.get_model(model_id),
model=model,
device=device,
inputs=_dataset_entries(inputs),
name=job_name,
@@ -83,13 +72,13 @@ def submit_inference_job(
def submit_profile_job(
model_id: str,
model: Model,
device: Device,
options: str | None = None,
job_name: str | None = None,
) -> ProfileJobResult:
job = hub.submit_profile_job(
model=hub.get_model(model_id),
model=model,
device=device,
name=job_name,
options=options or "",
@@ -98,17 +87,13 @@ def submit_profile_job(
def submit_quantize_job(
model: str | Path,
model: Model,
calibration_data: dict[str, Any],
options: str | None = None,
job_name: str | None = None,
model_name: str | None = None,
) -> ModelJobResult:
model_arg = str(model)
if model_name and Path(model_arg).exists():
model_arg = hub.upload_model(model_arg, name=model_name)
job = hub.submit_quantize_job(
model=model_arg,
model=model,
calibration_data=_dataset_entries(calibration_data),
weights_dtype=QuantizeDtype.INT8,
activations_dtype=QuantizeDtype.INT8,

View File

@@ -37,6 +37,10 @@ class CliStateStore:
value = self.get("last_model_artifact")
return str(value) if value else None
def get_last_optimized_model_id(self) -> str | None:
value = self.get("last_optimized_model_id")
return str(value) if value else None
def get_last_quantized_model_id(self) -> str | None:
value = self.get("last_quantized_model_id")
return str(value) if value else None