Source code for fermilink.cli.commands.optimize

from __future__ import annotations

import argparse
from pathlib import Path

from fermilink.optimize import goal as optimize_goal
from fermilink.optimize import main as optimize_controller
from fermilink.optimize import state as optimize_state


def _cli():
    from fermilink import cli

    return cli


def _normalize_text(value: object) -> str:
    if not isinstance(value, str):
        return ""
    return value.strip()


def _looks_like_quick_prompt(target: str) -> bool:
    lowered = target.lower()
    if lowered.endswith((".md", ".markdown", ".txt")):
        return True
    path = Path(target).expanduser()
    return path.is_file()


def _looks_like_markdown_prompt(target: str) -> bool:
    lowered = target.lower()
    if lowered.endswith((".md", ".markdown")):
        return True
    path = Path(target).expanduser()
    if not path.is_file():
        return False
    return path.suffix.lower() in {".md", ".markdown"}


def _is_goal_markdown_file(target: str) -> bool:
    """Return True if *target* points to a goal-structured markdown file."""

    path = Path(target).expanduser()
    if not path.is_file():
        return False
    try:
        text = path.read_text(encoding="utf-8")
    except OSError:
        return False
    return optimize_goal.is_goal_markdown(text)


def _resolve_optimize_mode(args: argparse.Namespace) -> str:
    cli = _cli()
    target = _normalize_text(getattr(args, "package_id", None))
    project_path = _normalize_text(getattr(args, "project_path", None))
    benchmark = _normalize_text(getattr(args, "benchmark", None))
    explicit_goal = bool(getattr(args, "goal", False))

    if target.lower() == "status":
        return "status"

    if project_path:
        if not target:
            raise cli.PackageError(
                "Optimize expert mode requires `<package_id> <project_path>`."
            )
        return "expert"

    if target and _looks_like_quick_prompt(target):
        # Goal mode defaults for markdown inputs. Quick mode remains available
        # only through internal controller entrypoints, not top-level CLI routing.
        if (
            explicit_goal
            or _looks_like_markdown_prompt(target)
            or _is_goal_markdown_file(target)
        ):
            return "goal"
        raise cli.PackageError(
            "Quick optimize mode is disabled at the top-level CLI. Use a markdown "
            "goal file (`fermilink optimize goal.md`) or expert mode "
            "(`fermilink optimize <package_id> <project_path> --benchmark <path>`)."
        )

    if benchmark and not project_path:
        raise cli.PackageError(
            "Optimize expert mode requires `<package_id> <project_path>` when "
            "`--benchmark` is provided."
        )

    if not target:
        raise cli.PackageError(
            "Optimize requires one of: `status`, `<goal.md>`, or "
            "`<package_id> <project_path> --benchmark <path>`."
        )

    raise cli.PackageError(
        "Unable to infer optimize mode from arguments. Use `fermilink optimize "
        "status`, `fermilink optimize goal.md`, "
        "or expert mode "
        "`fermilink optimize <package_id> <project_path> --benchmark <path>`."
    )


def _emit_status(args: argparse.Namespace, payload: dict[str, object]) -> int:
    cli = _cli()
    lines = [
        f"Optimize status: {payload.get('project_root')}",
        (
            f"Run lock: {payload.get('run_lock_status')} "
            f"(pid={payload.get('run_lock_pid')}, started={payload.get('run_lock_started_at')})"
        ),
        (
            f"Progress: iteration={payload.get('iteration')}, accepted="
            f"{payload.get('accepted_count')}, rejected={payload.get('rejected_count')}, "
            f"consecutive_rejections={payload.get('consecutive_rejections')}"
        ),
        (
            f"Incumbent: {payload.get('incumbent_commit')} "
            f"({payload.get('primary_metric_name')}={payload.get('incumbent_primary_metric')})"
        ),
        (
            f"Benchmark runtime: mode={payload.get('runtime_mode')}, "
            f"launcher={payload.get('launcher_status')}"
        ),
        "Recent results:",
        str(payload.get("recent_results") or "(no results yet)"),
    ]
    cli._emit_output(args, payload, lines)
    return 0


def _emit_campaign(args: argparse.Namespace, payload: dict[str, object]) -> int:
    cli = _cli()
    lines = [
        (
            f"Optimize campaign for '{payload.get('package_id')}' on branch "
            f"{payload.get('branch')}."
        ),
        (
            f"Incumbent commit: {payload.get('incumbent_commit')} "
            f"({payload.get('primary_metric_name')}="
            f"{payload.get('incumbent_primary_metric')})."
        ),
        (
            "Results: "
            f"{payload.get('accepted_count')} accepted, "
            f"{payload.get('rejected_count')} rejected."
        ),
    ]
    scaffold_benchmark = str(payload.get("scaffold_benchmark_path") or "").strip()
    run_script_path = str(payload.get("generated_run_script_path") or "").strip()
    scaffold_command_source = str(payload.get("scaffold_command_source") or "").strip()
    reference_examples = payload.get("scaffold_reference_examples")
    reference_examples = (
        reference_examples if isinstance(reference_examples, dict) else {}
    )
    if scaffold_benchmark:
        lines.append(f"Quick benchmark: {scaffold_benchmark}")
    if scaffold_command_source:
        lines.append(f"Case command source: {scaffold_command_source}")
    reference_benchmark = str(reference_examples.get("benchmark") or "").strip()
    reference_runner = str(reference_examples.get("runner") or "").strip()
    if reference_benchmark or reference_runner:
        lines.append(
            "Reference templates: "
            f"benchmark={reference_benchmark or 'n/a'}, "
            f"runner={reference_runner or 'n/a'}"
        )
    if run_script_path:
        lines.append(f"Generated run script: {run_script_path}")
    cli._emit_output(args, payload, lines)
    return 0


[docs] def cmd_optimize(args: argparse.Namespace) -> int: """Execute optimize goal mode, expert mode, or campaign status.""" cli = _cli() raw_worker_model = getattr(args, "worker_model", None) if raw_worker_model is not None: worker_model_text = str(raw_worker_model).strip() if not worker_model_text: raise cli.PackageError("--worker-model cannot be empty.") args.worker_model = worker_model_text if bool(getattr(args, "baseline_only", False)) and bool( getattr(args, "plan_only", False) ): raise cli.PackageError("Cannot combine --baseline-only with --plan-only.") mode = _resolve_optimize_mode(args) lock_path: Path | None = None if mode == "expert": project_path = _normalize_text(getattr(args, "project_path", None)) if project_path: lock_path = optimize_state.run_lock_path( cli._resolve_project_path(project_path) ) elif mode == "goal": lock_path = optimize_state.run_lock_path(Path.cwd().resolve()) try: if mode == "status": payload = optimize_controller.read_campaign_status(args) return _emit_status(args, payload) if mode == "goal": payload = optimize_controller.run_goal_campaign(args) return _emit_campaign(args, payload) payload = optimize_controller.run_campaign(args) except KeyboardInterrupt: cli._print_tagged("optimize", "interrupted by user.", stderr=True) return 130 finally: if isinstance(lock_path, Path): optimize_state.clear_run_lock(lock_path) return _emit_campaign(args, payload)