from __future__ import annotations
import argparse
from dataclasses import dataclass
from pathlib import Path
import sys
from fermilink.drvloop.artifacts import record_artifact_changes
from fermilink.drvloop.instructions import materialize_drvloop_instructions
from fermilink.drvloop.memory import ensure_drvloop_memory
from fermilink.drvloop.prompts import (
DRVLOOP_DONE_TOKEN,
DRVLOOP_PROMPT_PREFIX,
)
[docs]
@dataclass(frozen=True)
class DrvloopConfig:
repo_dir: Path
user_prompt: str
prompt_file: str | None = None
max_iterations: int = 30
sandbox: str | None = None
[docs]
def run_drvloop(config: DrvloopConfig) -> int:
"""Run the minimal derivation loop."""
repo_dir = config.repo_dir.resolve()
repo_dir.mkdir(parents=True, exist_ok=True)
try:
instruction_files = materialize_drvloop_instructions(repo_dir)
except OSError as exc:
raise ValueError(
f"Failed to prepare drvloop agent instructions: {exc}"
) from exc
_print_tagged(
"drvloop",
f"instructions: {instruction_files.agents_path.relative_to(repo_dir)}",
)
memory_path = ensure_drvloop_memory(
repo_dir=repo_dir,
user_prompt=config.user_prompt,
prompt_file=config.prompt_file,
)
_print_tagged("drvloop", f"memory: {memory_path.relative_to(repo_dir)}")
max_iterations = _positive_int(config.max_iterations, "max_iterations")
for iteration in range(1, max_iterations + 1):
_print_tagged("drvloop", f"iteration {iteration}/{max_iterations}")
artifact_changes = record_artifact_changes(repo_dir, memory_path)
prompt = build_drvloop_prompt(
repo_dir=repo_dir,
user_prompt=config.user_prompt,
artifact_changes=artifact_changes,
)
run_result = _run_provider_turn(
repo_dir=repo_dir,
prompt=prompt,
sandbox=config.sandbox,
)
if bool(run_result.get("stopped_by_user")):
_print_tagged("drvloop", "provider run stopped by user")
return 130
assistant_text = str(run_result.get("assistant_text") or "")
if any(
line.strip() == DRVLOOP_DONE_TOKEN for line in assistant_text.splitlines()
):
print(DRVLOOP_DONE_TOKEN)
record_artifact_changes(repo_dir, memory_path)
return 0
return_code = int(run_result.get("return_code") or 0)
if return_code != 0:
stderr = str(run_result.get("stderr") or "").strip()
if stderr:
print(stderr)
_print_tagged("drvloop", f"provider exited with code {return_code}")
return return_code
if iteration >= max_iterations:
break
_print_tagged("drvloop", "max iterations reached before DONE")
return 1
[docs]
def build_drvloop_prompt(
*,
repo_dir: Path,
user_prompt: str,
artifact_changes: list[dict[str, object]],
) -> str:
skill_lines = _discover_skill_lines(repo_dir)
artifact_lines = _format_artifact_change_lines(artifact_changes)
parts = [DRVLOOP_PROMPT_PREFIX.rstrip()]
parts.append("Local derivation skills:\n" + "\n".join(skill_lines))
parts.append(
"New or modified derivation artifacts before this turn:\n"
+ "\n".join(artifact_lines)
)
parts.append("Request:\n" + user_prompt.strip())
return "\n\n".join(parts).rstrip() + "\n"
def _discover_skill_lines(repo_dir: Path) -> list[str]:
skills_root = repo_dir / "skills"
if not skills_root.is_dir():
return ["- No local `skills/` directory was found."]
skill_paths = sorted(skills_root.rglob("SKILL.md"))
if not skill_paths:
return ["- `skills/` exists, but no `SKILL.md` files were found."]
lines: list[str] = []
for path in skill_paths[:50]:
try:
rel = path.relative_to(repo_dir).as_posix()
except ValueError:
rel = str(path)
lines.append(f"- {rel}")
if len(skill_paths) > 50:
lines.append(f"- ... {len(skill_paths) - 50} additional skill file(s)")
return lines
def _format_artifact_change_lines(
artifact_changes: list[dict[str, object]],
) -> list[str]:
if not artifact_changes:
return ["- No new or modified derivation artifacts were detected."]
lines: list[str] = []
for item in artifact_changes[:50]:
path = str(item.get("path") or "")
status = str(item.get("status") or "changed")
size = item.get("size_bytes")
modified = str(item.get("modified_utc") or "")
lines.append(f"- {path} | {status} | {size} bytes | modified {modified}")
if len(artifact_changes) > 50:
lines.append(f"- ... {len(artifact_changes) - 50} additional artifact(s)")
return lines
def _run_provider_turn(
*,
repo_dir: Path,
prompt: str,
sandbox: str | None,
) -> dict[str, object]:
from fermilink import cli
runtime_policy = cli.resolve_agent_runtime_policy()
sandbox_policy = runtime_policy.sandbox_policy
sandbox_mode = runtime_policy.sandbox_mode
if isinstance(sandbox, str) and sandbox.strip():
sandbox_policy = "enforce"
sandbox_mode = sandbox.strip()
provider_bin = cli.resolve_provider_binary_override(
runtime_policy.provider,
raw_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE,
)
return cli._run_exec_chat_turn(
repo_dir=repo_dir,
prompt=prompt,
sandbox=sandbox_mode if sandbox_policy == "enforce" else None,
provider_bin_override=provider_bin,
provider=runtime_policy.provider,
sandbox_policy=sandbox_policy,
model=runtime_policy.model,
reasoning_effort=runtime_policy.reasoning_effort,
)
def _resolve_prompt(tokens: list[str], repo_dir: Path) -> tuple[str, str | None]:
if len(tokens) == 1:
candidate = Path(tokens[0]).expanduser()
if not candidate.is_absolute():
candidate = (repo_dir / candidate).resolve()
if candidate.is_file():
text = candidate.read_text(encoding="utf-8", errors="replace").strip()
if not text:
raise ValueError(f"Prompt file is empty: {candidate}")
return text, str(candidate)
text = " ".join(tokens).strip()
if not text:
raise ValueError("Prompt is required.")
return text, None
def _positive_int(value: int, name: str) -> int:
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{name} must be an integer.") from exc
if parsed < 1:
raise ValueError(f"{name} must be >= 1.")
return parsed
def _print_tagged(tag: str, message: str, *, stderr: bool = False) -> None:
kwargs: dict[str, object] = {"flush": True}
if stderr:
kwargs["file"] = sys.stderr
print(f"[{tag}] {message}", **kwargs)
def _build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="python -m fermilink.drvloop.main",
description=(
"Debug runner for the derivation loop. "
"The same implementation is used by top-level `fermilink drvloop`."
),
)
parser.add_argument(
"prompt",
nargs="+",
help="Prompt text or a markdown/text goal file path.",
)
parser.add_argument("--max-iterations", type=int, default=30)
parser.add_argument("--sandbox", default=None)
return parser
[docs]
def cmd_drvloop(args: argparse.Namespace) -> int:
repo_dir = Path.cwd().resolve()
user_prompt, prompt_file = _resolve_prompt(args.prompt, repo_dir)
config = DrvloopConfig(
repo_dir=repo_dir,
user_prompt=user_prompt,
prompt_file=prompt_file,
max_iterations=args.max_iterations,
sandbox=args.sandbox,
)
return run_drvloop(config)
[docs]
def main(argv: list[str] | None = None) -> int:
parser = _build_arg_parser()
args = parser.parse_args(argv)
return cmd_drvloop(args)
if __name__ == "__main__":
raise SystemExit(main())