Source code for fermilink.drvloop.spec

from __future__ import annotations

import hashlib
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

import yaml

from fermilink.drvloop.prompts import (
    DRVLOOP_MEMORY_DIRNAME,
    DRVLOOP_SPEC_FILENAME,
    DRVLOOP_STATE_DIRNAME,
)


DRVLOOP_SESSION_FILENAME = "session.json"
_LOCKED_SPEC_FIELDS = (
    "problem_statement",
    "target_claims",
    "assumptions",
    "allowed_approximations",
    "domains",
    "boundary_conditions",
    "final_artifacts",
    "non_goals",
)


[docs] @dataclass(frozen=True) class DerivationSpecContext: project_dir: Path spec_path: Path payload: dict[str, Any] integrity: dict[str, Any] @property def project_rel(self) -> str: return self.project_dir.as_posix() @property def spec_rel(self) -> str: return self.spec_path.as_posix()
[docs] def ensure_derivation_spec( *, repo_dir: Path, user_prompt: str, prompt_file: str | None, ) -> DerivationSpecContext: """Create or reuse the active drvloop derivation specification.""" project_dir = _resolve_active_project_dir(repo_dir, user_prompt) project_dir.mkdir(parents=True, exist_ok=True) spec_path = project_dir / DRVLOOP_SPEC_FILENAME if spec_path.is_file(): payload = _load_spec_payload(spec_path) else: payload = _default_spec_payload( user_prompt=user_prompt, prompt_file=prompt_file, ) _write_spec_payload(spec_path, payload) integrity = validate_spec_integrity(payload) _save_active_project(repo_dir, project_dir) return DerivationSpecContext( project_dir=project_dir.relative_to(repo_dir), spec_path=spec_path.relative_to(repo_dir), payload=payload, integrity=integrity, )
[docs] def validate_spec_integrity(payload: dict[str, Any]) -> dict[str, Any]: expected = str(payload.get("locked_hash") or "").strip() locked_fields = _locked_fields_for(payload) actual = compute_locked_hash(payload, locked_fields=locked_fields) ok = bool(expected) and expected == actual return { "ok": ok, "expected_hash": expected, "actual_hash": actual, "locked_fields": list(locked_fields), "status": "ok" if ok else "mismatch", }
[docs] def compute_locked_hash( payload: dict[str, Any], *, locked_fields: tuple[str, ...] | None = None, ) -> str: fields = locked_fields or _locked_fields_for(payload) locked = {key: payload.get(key) for key in fields} canonical = json.dumps( locked, ensure_ascii=True, sort_keys=True, separators=(",", ":") ) return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
[docs] def format_spec_context(context: DerivationSpecContext) -> str: payload = context.payload target_lines = [] for claim in _as_list(payload.get("target_claims")): if isinstance(claim, dict): claim_id = str(claim.get("id") or "").strip() or "target" text = str(claim.get("claim") or "").strip() target_lines.append(f"- {claim_id}: {text}") elif str(claim).strip(): target_lines.append(f"- {str(claim).strip()}") if not target_lines: target_lines = ["- No target claims recorded."] assumption_lines = _string_list_lines(payload.get("assumptions")) approximation_lines = _string_list_lines(payload.get("allowed_approximations")) domain_lines = _string_list_lines(payload.get("domains")) integrity = context.integrity return "\n".join( [ f"- project: {context.project_rel}", f"- spec: {context.spec_rel}", f"- locked_hash_status: {integrity.get('status')}", "- target_claims:", *target_lines, "- assumptions:", *assumption_lines, "- allowed_approximations:", *approximation_lines, "- domains:", *domain_lines, ] )
[docs] def target_claim_ids(payload: dict[str, Any]) -> list[str]: ids: list[str] = [] for index, claim in enumerate(_as_list(payload.get("target_claims")), start=1): if isinstance(claim, dict): claim_id = str(claim.get("id") or "").strip() if claim_id: ids.append(claim_id) continue ids.append(f"target-{index}") return ids
def _locked_fields_for(payload: dict[str, Any]) -> tuple[str, ...]: raw = payload.get("locked_fields") if isinstance(raw, list): fields = tuple(str(item).strip() for item in raw if str(item).strip()) if fields: return fields return _LOCKED_SPEC_FIELDS def _resolve_active_project_dir(repo_dir: Path, user_prompt: str) -> Path: session_project = _load_active_project(repo_dir) if session_project is not None: candidate = repo_dir / session_project if candidate.is_dir(): return candidate projects_dir = repo_dir / DRVLOOP_MEMORY_DIRNAME specs = [] if projects_dir.is_dir(): for path in projects_dir.glob(f"*/{DRVLOOP_SPEC_FILENAME}"): try: specs.append((path.stat().st_mtime_ns, path.parent)) except OSError: continue if specs: return sorted(specs, reverse=True)[0][1] slug = _slugify(user_prompt) today = datetime.now(timezone.utc).date().isoformat() base = projects_dir / f"{today}-{slug}" candidate = base index = 2 while candidate.exists(): candidate = Path(f"{base}-{index}") index += 1 return candidate def _default_spec_payload( *, user_prompt: str, prompt_file: str | None ) -> dict[str, Any]: now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") payload: dict[str, Any] = { "schema_version": 1, "created_at_utc": now, "updated_at_utc": now, "prompt_source": prompt_file, "problem_statement": user_prompt.strip(), "target_claims": [ { "id": "target-1", "claim": user_prompt.strip(), } ], "definitions": [], "assumptions": [], "allowed_approximations": [], "domains": [], "boundary_conditions": [], "final_artifacts": { "require_manuscript": True, "require_pedagogical_note": True, "manuscript_globs": [ "final*.tex", "final*.md", "*manuscript*.tex", "*manuscript*.md", "*preprint*.tex", "*preprint*.md", ], "pedagogical_note_globs": [ "*pedagogical*.tex", "*pedagogical*.md", "*supplement*.tex", "*supplement*.md", "*appendix*.tex", "*appendix*.md", ], }, "non_goals": [], "spec_amendments": [], "locked_fields": list(_LOCKED_SPEC_FIELDS), } payload["locked_hash"] = compute_locked_hash(payload) return payload def _load_spec_payload(path: Path) -> dict[str, Any]: try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except (OSError, yaml.YAMLError): payload = None if not isinstance(payload, dict): payload = {} payload.setdefault("schema_version", 1) payload.setdefault("target_claims", []) payload.setdefault("assumptions", []) payload.setdefault("allowed_approximations", []) payload.setdefault("domains", []) payload.setdefault("boundary_conditions", []) payload.setdefault("final_artifacts", {}) payload.setdefault("non_goals", []) payload.setdefault("spec_amendments", []) return payload def _write_spec_payload(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text( yaml.safe_dump(payload, sort_keys=False, default_flow_style=False), encoding="utf-8", ) def _load_active_project(repo_dir: Path) -> Path | None: path = repo_dir / DRVLOOP_STATE_DIRNAME / DRVLOOP_SESSION_FILENAME if not path.is_file(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None rel = str(payload.get("active_project") or "").strip() if not rel: return None candidate = Path(rel) if candidate.is_absolute() or ".." in candidate.parts: return None return candidate def _save_active_project(repo_dir: Path, project_dir: Path) -> None: state_dir = repo_dir / DRVLOOP_STATE_DIRNAME state_dir.mkdir(parents=True, exist_ok=True) try: rel = project_dir.relative_to(repo_dir).as_posix() except ValueError: rel = project_dir.as_posix() path = state_dir / DRVLOOP_SESSION_FILENAME path.write_text( json.dumps({"version": 1, "active_project": rel}, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) def _slugify(text: str) -> str: words = re.findall(r"[A-Za-z0-9]+", text.lower())[:8] if not words: return "derivation" slug = "-".join(words) return slug[:64].strip("-") or "derivation" def _as_list(value: Any) -> list[Any]: if isinstance(value, list): return value if value in (None, ""): return [] return [value] def _string_list_lines(value: Any) -> list[str]: items = _as_list(value) lines = [f"- {str(item).strip()}" for item in items if str(item).strip()] return lines or ["- None recorded."]