from collections.abc import Mapping, Sequence from datetime import datetime from enum import StrEnum from pathlib import Path from typing import Any import qai_hub.hub as hub import typer from qai_hub.client import Device from src import state as state_ops from src.commands.utils import CONFIG_OPT, CONSOLE, load_cfg from src.config import Config from src.qualcomm import aihub_jobs from src.qualcomm.artifacts import resolve_onnx app = typer.Typer(help="Quantize, compile, validate, profile, and download models with Qualcomm Workbench") _RUNTIME_EXTENSIONS = { "tflite": "tflite", "qnn_context_binary": "bin", "onnx": "onnx", } class UploadStep(StrEnum): quantize = "quantize" compile = "compile" validate = "validate" profile = "profile" def _input_specs(cfg: Config) -> dict[str, tuple[tuple[int, ...], str]]: specs = {name: (tuple(shape), dtype) for name, (shape, dtype) in cfg.aihub.input_specs.items()} if not specs: CONSOLE.print("[red]aihub.input_specs must define at least one input.[/red]") raise typer.Exit(1) return specs def _load_inputs( input_file: Path, specs: Mapping[str, tuple[Sequence[int], str]], input_name: str | None = None, ) -> dict[str, Any]: import numpy as np if not input_file.exists(): raise FileNotFoundError(f"File not found: {input_file}") if input_file.suffix == ".npz": loaded = np.load(input_file) missing = set(specs) - set(loaded.files) if missing: raise ValueError(f"Missing input(s) in NPZ: {', '.join(sorted(missing))}") return {name: loaded[name] for name in specs} if input_file.suffix == ".npy": if input_name is None: if len(specs) != 1: raise ValueError("--input-name is required when config has multiple inputs") input_name = next(iter(specs)) if input_name not in specs: raise ValueError(f"Input name '{input_name}' is not defined in aihub.input_specs") return {input_name: np.load(input_file)} raise ValueError("Input file must be .npz or .npy") def _load_calibration(path: Path, specs: Mapping[str, tuple[Sequence[int], str]]) -> dict[str, Any]: import numpy as np if path.is_file(): return _load_inputs(path, specs) if not path.is_dir(): raise FileNotFoundError(f"Calibration path not found: {path}") if len(specs) != 1: raise ValueError("Directory calibration data is supported only for single-input models.") input_name = next(iter(specs)) samples = [np.load(p) for p in sorted(path.glob("*.npy"))] if not samples: raise ValueError(f"No .npy calibration samples found in {path}") return {input_name: samples} def _job_name(cfg: Config, operation: str) -> str | None: if not cfg.aihub.job_name: return None return f"{cfg.aihub.job_name}-{operation}" def _model_id_or_state(config_path: str, model_id: str | None, *, quantized: bool = False) -> str: st = state_ops.store(config_path) resolved = model_id or (st.get_last_quantized_model_id() if quantized else st.get_last_compiled_model_id()) if not resolved: source = "quantized" if quantized else "compiled" CONSOLE.print(f"[red]No {source} model found. Pass --model-id or run the previous AI Hub step first.[/red]") raise typer.Exit(1) return resolved def _device_selector(device: Device) -> str: parts: list[str] = [] if device.name: parts.append(f"name={device.name!r}") if device.os: parts.append(f"os={device.os!r}") if device.attributes: parts.append(f"attributes={device.attributes!r}") return ", ".join(parts) if parts else "empty selector" def _validate_device(cfg: Config) -> None: device = cfg.aihub.device try: matches = hub.get_devices(name=device.name, os=device.os, attributes=device.attributes) except Exception as e: CONSOLE.print(f"[red]Unable to validate AI Hub device {_device_selector(device)}: {e}[/red]") raise typer.Exit(1) if matches: return CONSOLE.print(f"[red]AI Hub device not found: {_device_selector(device)}[/red]") CONSOLE.print("Run [bold]qai-hub list-devices[/bold] to see valid device names.") raise typer.Exit(1) def _quantize_step( cfg: Config, config_path: str, calibration_path: Path, from_job: str | None, model_s3_uri: str | None, onnx_path: str | None, ) -> str: st = state_ops.store(config_path) specs = _input_specs(cfg) try: resolved = resolve_onnx( cfg=cfg, output_dir=cfg.aihub.output_dir, from_job=from_job, model_s3_uri=model_s3_uri or st.get_last_model_artifact(), onnx_path=onnx_path, last_training_job=st.get_last_training_job(), ) calibration_data = _load_calibration(calibration_path, specs) except (FileNotFoundError, ValueError) as e: CONSOLE.print(f"[red]{e}[/red]") raise typer.Exit(1) try: result = aihub_jobs.submit_quantize_job( resolved.onnx_path, calibration_data, cfg.aihub.quantize_options, job_name=_job_name(cfg, "quantize"), model_name=cfg.aihub.model_name, ) except Exception as e: CONSOLE.print(f"[red]AI Hub quantize failed: {e}[/red]") raise typer.Exit(1) st.update( last_model_artifact=resolved.model_artifact, last_quantize_job_id=result["job_id"], last_quantized_model_id=result["model_id"], ) CONSOLE.print(f"[green]✓[/green] Quantize job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Quantized model: [bold]{result['model_id']}[/bold]") return str(result["model_id"]) def _compile_step( cfg: Config, config_path: str, model_id: str | None, from_job: str | None, model_s3_uri: str | None, onnx_path: str | None, *, prefer_quantized: bool, ) -> str: st = state_ops.store(config_path) _validate_device(cfg) specs = _input_specs(cfg) model: Any model_artifact: str | None = None has_explicit_source = bool(from_job or model_s3_uri or onnx_path) if model_id: model = model_id elif prefer_quantized and not has_explicit_source and st.get_last_quantized_model_id(): model = st.get_last_quantized_model_id() else: try: resolved = resolve_onnx( cfg=cfg, output_dir=cfg.aihub.output_dir, from_job=from_job, model_s3_uri=model_s3_uri, onnx_path=onnx_path, last_training_job=st.get_last_training_job(), ) except (FileNotFoundError, ValueError) as e: CONSOLE.print(f"[red]{e}[/red]") raise typer.Exit(1) model = resolved.onnx_path model_artifact = resolved.model_artifact try: result = aihub_jobs.submit_compile_job( model=model, device=cfg.aihub.device, input_specs=specs, target_runtime=cfg.aihub.target_runtime, options=cfg.aihub.compile_options, job_name=_job_name(cfg, "compile"), model_name=cfg.aihub.model_name if isinstance(model, Path) else None, ) except Exception as e: CONSOLE.print(f"[red]AI Hub compile failed: {e}[/red]") raise typer.Exit(1) updates: dict[str, Any] = { "last_compile_job_id": result["job_id"], "last_compiled_model_id": result["model_id"], } if model_artifact: updates["last_model_artifact"] = model_artifact st.update(**updates) CONSOLE.print(f"[green]✓[/green] Compile job: [bold]{result['job_id']}[/bold]") CONSOLE.print(f"[green]✓[/green] Compiled model: [bold]{result['model_id']}[/bold]") return str(result["model_id"]) def _validate_step( cfg: Config, config_path: str, input_file: Path, model_id: str | None, input_name: str | None, ) -> str: _validate_device(cfg) specs = _input_specs(cfg) resolved_model_id = _model_id_or_state(config_path, model_id) try: inputs = _load_inputs(input_file, specs, input_name) except (FileNotFoundError, ValueError) as e: CONSOLE.print(f"[red]{e}[/red]") raise typer.Exit(1) run = datetime.now().strftime("%Y%m%d-%H%M%S") out_dir = Path(cfg.aihub.output_dir) / run / "validation" try: result = aihub_jobs.submit_inference_job( resolved_model_id, cfg.aihub.device, inputs, out_dir, job_name=_job_name(cfg, "validate"), ) except Exception as e: CONSOLE.print(f"[red]AI Hub inference failed: {e}[/red]") raise typer.Exit(1) state_ops.store(config_path).update(last_inference_job_id=result["job_id"]) CONSOLE.print(f"[green]✓[/green] Inference job: [bold]{result['job_id']}[/bold]") outputs = result.get("outputs") if isinstance(outputs, dict): for name, value in outputs.items(): CONSOLE.print(f" {name}: shape={getattr(value, 'shape', '?')}") CONSOLE.print(f"Outputs: [cyan]{out_dir}[/cyan]") return str(result["job_id"]) def _profile_step(cfg: Config, config_path: str, model_id: str | None) -> str: _validate_device(cfg) resolved_model_id = _model_id_or_state(config_path, model_id) try: result = aihub_jobs.submit_profile_job( resolved_model_id, cfg.aihub.device, cfg.aihub.profile_options, job_name=_job_name(cfg, "profile"), ) except Exception as e: CONSOLE.print(f"[red]AI Hub profile failed: {e}[/red]") raise typer.Exit(1) state_ops.store(config_path).update(last_profile_job_id=result["job_id"]) CONSOLE.print(f"[green]✓[/green] Profile job: [bold]{result['job_id']}[/bold]") return str(result["job_id"]) @app.command() def quantize( calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"), from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should quantize"), model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to quantize"), onnx_path: str | None = typer.Option( None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact" ), config: str = CONFIG_OPT, ) -> None: """Quantize an ONNX model to INT8.""" cfg = load_cfg(config) _quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path) @app.command() def compile( model_id: str | None = typer.Option(None, "--model-id", help="AI Hub model ID to compile"), from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should compile"), model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to compile"), onnx_path: str | None = typer.Option( None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact" ), config: str = CONFIG_OPT, ) -> None: """Compile a model for the configured Qualcomm AI Hub target.""" cfg = load_cfg(config) _compile_step(cfg, config, model_id, from_job, model_s3_uri, onnx_path, prefer_quantized=True) @app.command() def validate( input_file: Path = typer.Argument(..., help="NumPy .npz or .npy inputs to run on device"), model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"), input_name: str | None = typer.Option(None, "--input-name", help="Input name for .npy files"), config: str = CONFIG_OPT, ) -> None: """Run an AI Hub inference job using sample inputs.""" cfg = load_cfg(config) _validate_step(cfg, config, input_file, model_id, input_name) @app.command() def profile( model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"), config: str = CONFIG_OPT, ) -> None: """Profile a compiled model on the configured AI Hub device.""" cfg = load_cfg(config) _profile_step(cfg, config, model_id) @app.command() def upload( calibration_path: Path = typer.Argument(..., help="Calibration .npz file or directory of .npy samples"), input_file: Path = typer.Argument(..., help="Validation .npz or .npy inputs to run on device"), from_step: UploadStep = typer.Option(UploadStep.quantize, "--from-step", help="Resume from this Workbench step"), from_job: str | None = typer.Option(None, "--from-job", help="Training job name whose model artifact should upload"), model_s3_uri: str | None = typer.Option(None, "--model-s3-uri", help="S3 URI of model.tar.gz to upload"), onnx_path: str | None = typer.Option( None, "--onnx-path", help="Local ONNX path or ONNX path inside extracted artifact" ), input_name: str | None = typer.Option(None, "--input-name", help="Input name for .npy validation files"), config: str = CONFIG_OPT, ) -> None: """Run the four Workbench upload steps: quantize, compile, validate, and profile.""" cfg = load_cfg(config) steps = [UploadStep.quantize, UploadStep.compile, UploadStep.validate, UploadStep.profile] selected = steps[steps.index(from_step) :] quantized_model_id: str | None = None compiled_model_id: str | None = None if UploadStep.quantize in selected: quantized_model_id = _quantize_step(cfg, config, calibration_path, from_job, model_s3_uri, onnx_path) if UploadStep.compile in selected: compiled_model_id = _compile_step( cfg, config, model_id=quantized_model_id, from_job=from_job, model_s3_uri=model_s3_uri, onnx_path=onnx_path, prefer_quantized=True, ) if UploadStep.validate in selected: _validate_step(cfg, config, input_file, compiled_model_id, input_name) if UploadStep.profile in selected: _profile_step(cfg, config, compiled_model_id) @app.command() def download( model_id: str | None = typer.Option(None, "--model-id", help="AI Hub compiled model ID"), output: Path | None = typer.Option(None, "--output", "-o", help="Destination file path"), config: str = CONFIG_OPT, ) -> None: """Download the last compiled deployable artifact from AI Hub.""" cfg = load_cfg(config) resolved_model_id = _model_id_or_state(config, model_id) ext = _RUNTIME_EXTENSIONS.get(cfg.aihub.target_runtime, cfg.aihub.target_runtime) dest = output or (Path(cfg.aihub.output_dir) / f"model.{ext}") try: written = aihub_jobs.download_model(resolved_model_id, dest) except Exception as e: CONSOLE.print(f"[red]AI Hub download failed: {e}[/red]") raise typer.Exit(1) state_ops.store(config).update(last_downloaded_model=written) CONSOLE.print(f"[green]✓[/green] Downloaded model: [cyan]{written}[/cyan]")