Source code for fermilink.implement.contract

from __future__ import annotations

import copy
import shlex
import sys
from pathlib import Path
from typing import Any

import yaml

from . import state as implement_state


DEFAULT_MAX_ITERATIONS = 80
DEFAULT_STOP_ON_CONSECUTIVE_REJECTIONS = 20
DEFAULT_WORKER_MAX_ITERATIONS = 8
DEFAULT_WORKER_WAIT_SECONDS = 1.0
DEFAULT_WORKER_MAX_WAIT_SECONDS = 6000.0
DEFAULT_TIMEOUT_SECONDS = 1800
DEFAULT_MIN_SCORE_IMPROVEMENT = 1.0e-6

WORKLOAD_SPLIT_KEY = "workload_split"
WORKLOAD_SPLIT_TRAIN_PREFIXES = ("train-", "worker-", "public-")
WORKLOAD_SPLIT_TEST_PREFIXES = ("test-", "holdout-", "controller-", "private-")


def _cli():
    from fermilink import cli

    return cli


def _str_list(payload: object) -> list[str]:
    if not isinstance(payload, list):
        return []
    return [
        str(item).strip()
        for item in payload
        if isinstance(item, str) and str(item).strip()
    ]


def _workload_id(value: str) -> str:
    text = str(value or "").strip()
    if not text:
        return ""
    return text.split(":", 1)[0].strip()


def _has_prefixed_workload_split(workloads: list[str]) -> bool:
    has_worker = False
    has_controller = False
    for workload in workloads:
        workload_id = _workload_id(workload).lower()
        if workload_id.startswith(WORKLOAD_SPLIT_TRAIN_PREFIXES):
            has_worker = True
        if workload_id.startswith(WORKLOAD_SPLIT_TEST_PREFIXES):
            has_controller = True
    return has_worker and has_controller


[docs] def infer_workload_split(workloads: list[str]) -> dict[str, Any]: normalized = [str(item).strip() for item in workloads if str(item).strip()] if len(normalized) < 2: return { "enabled": False, "source": "insufficient_workloads", "worker_workloads": normalized, "controller_workloads": [], "worker_workload_ids": [_workload_id(item) for item in normalized], "controller_workload_ids": [], } worker_workloads: list[str] = [] controller_workloads: list[str] = [] if _has_prefixed_workload_split(normalized): for workload in normalized: workload_id = _workload_id(workload).lower() if workload_id.startswith(WORKLOAD_SPLIT_TEST_PREFIXES): controller_workloads.append(workload) else: worker_workloads.append(workload) source = "prefix" else: worker_workloads = normalized[:-1] controller_workloads = normalized[-1:] source = "deterministic_holdout" return { "enabled": bool(worker_workloads and controller_workloads), "source": source, "worker_workloads": worker_workloads, "controller_workloads": controller_workloads, "worker_workload_ids": [_workload_id(item) for item in worker_workloads], "controller_workload_ids": [ _workload_id(item) for item in controller_workloads ], }
def _command_list_from_text(text: str) -> list[str]: try: tokens = shlex.split(str(text or "").strip()) except ValueError: return [] return [token for token in tokens if token]
[docs] def normalize_command_list(payload: object) -> list[list[str]]: """Normalize command specs to a list of token lists.""" commands: list[list[str]] = [] if isinstance(payload, str): command = _command_list_from_text(payload) return [command] if command else [] if not isinstance(payload, list): return [] for item in payload: if isinstance(item, str): command = _command_list_from_text(item) elif isinstance(item, list): command = [ str(token).strip() for token in item if isinstance(token, str) and str(token).strip() ] else: command = [] if command: commands.append(command) return commands
def _default_editable_scope(project_root: Path, goal_spec: dict[str, Any]) -> list[str]: explicit = _str_list(goal_spec.get("editable_scope")) if explicit: return explicit for candidate in ("src", "lib"): if (project_root / candidate).is_dir(): return [f"{candidate}/**"] if any(project_root.glob("*.py")): return ["*.py", "tests/**"] return ["**/*"] def _default_validation_commands( project_root: Path, goal_spec: dict[str, Any], ) -> list[list[str]]: explicit = normalize_command_list(goal_spec.get("validation_commands")) if explicit: return explicit if (project_root / "tests").is_dir(): return [[sys.executable, "-m", "pytest", "-q"]] return [] def _baseline_mode(goal_spec: dict[str, Any]) -> str: baseline = str(goal_spec.get("baseline_reference") or "").strip() if baseline: return "reference" return "exploratory"
[docs] def build_default_contract( project_root: Path, *, package_id: str, goal_spec: dict[str, Any], analysis: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build a deterministic fallback contract from goal.md.""" validation_commands = _default_validation_commands(project_root, goal_spec) build_commands = normalize_command_list(goal_spec.get("build_commands")) legacy_pre_commands = normalize_command_list(goal_spec.get("pre_commands")) pre_commands = build_commands if build_commands else legacy_pre_commands implementation_id = f"implement-{package_id}" target = str(goal_spec.get("target") or "").strip() done_criteria = _str_list(goal_spec.get("done_criteria")) desired_outputs = _str_list(goal_spec.get("desired_outputs")) workloads = _str_list(goal_spec.get("workloads")) workload_split = infer_workload_split(workloads) input_api = str(goal_spec.get("input_api") or "").strip() if not input_api and isinstance(analysis, dict): api_candidates = analysis.get("proposed_api") or analysis.get("api") if isinstance(api_candidates, str): input_api = api_candidates.strip() if not input_api: input_api = "Infer and implement the smallest public API needed by goal.md." return { "schema_version": 1, "implementation_id": implementation_id, "package_id": package_id, "target": target, "baseline": { "mode": _baseline_mode(goal_spec), "reference": str(goal_spec.get("baseline_reference") or "").strip(), "optional": bool(goal_spec.get("baseline_optional", True)), }, "api": { "input": input_api, "lock_after_first_passing_validation": True, }, "outputs": { "desired": desired_outputs, "flexible": not bool(desired_outputs), }, "workloads": workloads, WORKLOAD_SPLIT_KEY: workload_split, "repo": { "editable_paths": _default_editable_scope(project_root, goal_spec), "immutable_paths": [ ".fermilink-implement/**", ".fermilink-optimize/**", "skills/**", ], }, "pre_commands": { "worker": copy.deepcopy(pre_commands), "controller": copy.deepcopy(pre_commands), }, "validation": { "mode": "progressive", "commands": validation_commands, "source": ( "goal_code_blocks" if normalize_command_list(goal_spec.get("validation_commands")) else "agent_or_default" ), "allow_partial_improvements": True, "requires_complete_for_final": True, }, "scoring": { "max_score": 100.0, "min_score_improvement": DEFAULT_MIN_SCORE_IMPROVEMENT, "complete_score": 100.0, }, "campaign": { "max_iterations": DEFAULT_MAX_ITERATIONS, "stop_on_consecutive_rejections": DEFAULT_STOP_ON_CONSECUTIVE_REJECTIONS, }, "worker": { "max_iterations": DEFAULT_WORKER_MAX_ITERATIONS, "wait_seconds": DEFAULT_WORKER_WAIT_SECONDS, "max_wait_seconds": DEFAULT_WORKER_MAX_WAIT_SECONDS, "pid_stall_seconds": 900.0, }, "controller": { "timeout_seconds": DEFAULT_TIMEOUT_SECONDS, }, "guardrails": { "reject_out_of_scope_edits": True, "reject_validation_weakening": True, "reject_hardcoded_workload_answers": True, "reject_unrelated_algorithm_changes": True, }, "done_criteria": done_criteria, "non_goals": _str_list(goal_spec.get("non_goals")), "autogen": { "created_at_utc": implement_state.utc_now_z(), "source": "default_contract", }, }
[docs] def validate_contract(payload: dict[str, Any]) -> None: cli = _cli() if not isinstance(payload, dict): raise cli.PackageError("Implementation contract must be a YAML object.") if int(payload.get("schema_version") or 0) != 1: raise cli.PackageError("Implementation contract schema_version must be 1.") repo = payload.get("repo") if not isinstance(repo, dict): raise cli.PackageError("Implementation contract missing repo block.") editable_paths = repo.get("editable_paths") if not isinstance(editable_paths, list) or not _str_list(editable_paths): raise cli.PackageError( "Implementation contract repo.editable_paths is required." ) validation = payload.get("validation") if not isinstance(validation, dict): raise cli.PackageError("Implementation contract missing validation block.") commands = validation.get("commands") if commands is not None and not isinstance(commands, list): raise cli.PackageError( "Implementation contract validation.commands must be a list." ) split = payload.get(WORKLOAD_SPLIT_KEY) if split is not None and not isinstance(split, dict): raise cli.PackageError( "Implementation contract workload_split must be an object." ) pre_commands = payload.get("pre_commands") if pre_commands is not None: if not isinstance(pre_commands, dict): raise cli.PackageError( "Implementation contract pre_commands must be an object." ) for key in ("worker", "controller"): raw = pre_commands.get(key) if raw is None: continue if not isinstance(raw, list): raise cli.PackageError( f"Implementation contract pre_commands.{key} must be a list." ) for index, command in enumerate(raw, start=1): if not isinstance(command, list) or not command: raise cli.PackageError( "Implementation contract " f"pre_commands.{key}[{index}] must be a non-empty list." ) if not all(isinstance(item, str) and item.strip() for item in command): raise cli.PackageError( "Implementation contract " f"pre_commands.{key}[{index}] must contain strings." )
[docs] def load_contract(path: Path) -> dict[str, Any]: cli = _cli() try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except OSError as exc: raise cli.PackageError( f"Failed to read implementation contract: {exc}" ) from exc except yaml.YAMLError as exc: raise cli.PackageError( f"Invalid YAML in implementation contract: {exc}" ) from exc if not isinstance(payload, dict): raise cli.PackageError("Implementation contract must contain a YAML object.") validate_contract(payload) return payload
[docs] def write_contract(path: Path, payload: dict[str, Any]) -> None: validate_contract(payload) path.parent.mkdir(parents=True, exist_ok=True) path.write_text( yaml.safe_dump(payload, sort_keys=False, default_flow_style=False), encoding="utf-8", )
[docs] def editable_paths(payload: dict[str, Any]) -> list[str]: repo = payload.get("repo") if isinstance(payload, dict) else {} return _str_list(repo.get("editable_paths") if isinstance(repo, dict) else None)
[docs] def immutable_paths(payload: dict[str, Any]) -> list[str]: repo = payload.get("repo") if isinstance(payload, dict) else {} return _str_list(repo.get("immutable_paths") if isinstance(repo, dict) else None)
[docs] def pre_commands(payload: dict[str, Any], role: str) -> list[list[str]]: pre = payload.get("pre_commands") if not isinstance(pre, dict): return [] return normalize_command_list(pre.get(role))
[docs] def validation_commands(payload: dict[str, Any]) -> list[list[str]]: validation = payload.get("validation") if not isinstance(validation, dict): return [] return normalize_command_list(validation.get("commands"))
[docs] def workload_split(payload: dict[str, Any]) -> dict[str, Any]: split = payload.get(WORKLOAD_SPLIT_KEY) if isinstance(split, dict): return copy.deepcopy(split) return infer_workload_split(_str_list(payload.get("workloads")))
[docs] def worker_visible_contract(payload: dict[str, Any]) -> dict[str, Any]: worker_payload = copy.deepcopy(payload) split = workload_split(worker_payload) enabled = bool(split.get("enabled")) if enabled: worker_payload["workloads"] = _str_list(split.get("worker_workloads")) worker_payload[WORKLOAD_SPLIT_KEY] = { "enabled": enabled, "role": "worker", "source": str(split.get("source") or ""), "worker_workload_ids": _str_list(split.get("worker_workload_ids")), "controller_workload_count": len(_str_list(split.get("controller_workloads"))), "controller_workloads_redacted": enabled, } validation = worker_payload.get("validation") if isinstance(validation, dict): worker_commands = ( validation.get("worker_commands") if validation.get("worker_commands") is not None else validation.get("public_commands") ) if worker_commands is not None: validation["commands"] = normalize_command_list(worker_commands) validation["commands_source"] = "worker_visible" elif enabled: validation.pop("commands", None) validation["commands_hidden_from_worker"] = True validation["note"] = ( "Authoritative controller validation commands are hidden from " "the worker when controller-only workloads are present." ) return worker_payload
[docs] def campaign_config(payload: dict[str, Any]) -> dict[str, Any]: campaign = payload.get("campaign") return campaign if isinstance(campaign, dict) else {}
[docs] def worker_config(payload: dict[str, Any]) -> dict[str, Any]: worker = payload.get("worker") return worker if isinstance(worker, dict) else {}
[docs] def controller_config(payload: dict[str, Any]) -> dict[str, Any]: controller = payload.get("controller") return controller if isinstance(controller, dict) else {}
[docs] def scoring_config(payload: dict[str, Any]) -> dict[str, Any]: scoring = payload.get("scoring") return scoring if isinstance(scoring, dict) else {}