Source code for fermilink.implement.validation

from __future__ import annotations

import copy
import json
import os
import subprocess
import time
from pathlib import Path
from typing import Any

from fermilink.optimize import git as optimize_git

from . import contract as implement_contract


def _normalize_rel_path(value: str) -> str:
    return str(value or "").strip().replace("\\", "/").lstrip("/")


[docs] def expand_command( command: list[str], *, project_root: Path, run_dir: Path, contract_path: Path, ) -> list[str]: replacements = { "{project_root}": str(project_root), "{run_dir}": str(run_dir), "{contract}": str(contract_path), } expanded: list[str] = [] for token in command: rendered = str(token) for key, value in replacements.items(): rendered = rendered.replace(key, value) expanded.append(rendered) return expanded
def _changed_signatures(repo_dir: Path) -> set[tuple[str, str]]: signatures: set[tuple[str, str]] = set() for entry in optimize_git.list_changed_paths(repo_dir): path = _normalize_rel_path(str(entry.get("path") or "")) status = str(entry.get("status") or "").strip() if path and status and status != "??": signatures.add((status, path)) return signatures
[docs] def run_pre_commands( project_root: Path, *, commands: list[list[str]], run_dir: Path, timeout_seconds: int, log_prefix: str, contract_path: Path, ) -> dict[str, Any] | None: """Run deterministic pre-commands and reject tracked source side effects.""" if not commands: return None run_dir.mkdir(parents=True, exist_ok=True) baseline_changed = _changed_signatures(project_root) env = os.environ.copy() last_stdout = run_dir / f"{log_prefix}_0.stdout.log" last_stderr = run_dir / f"{log_prefix}_0.stderr.log" for index, command_template in enumerate(commands, start=1): command = expand_command( command_template, project_root=project_root, run_dir=run_dir, contract_path=contract_path, ) last_stdout = run_dir / f"{log_prefix}_{index}.stdout.log" last_stderr = run_dir / f"{log_prefix}_{index}.stderr.log" try: completed = subprocess.run( command, cwd=str(project_root), text=True, capture_output=True, env=env, timeout=timeout_seconds, check=False, ) except subprocess.TimeoutExpired as exc: last_stdout.write_text(str(exc.stdout or ""), encoding="utf-8") last_stderr.write_text(str(exc.stderr or ""), encoding="utf-8") return { "ok": False, "status": "pre_command_timeout", "reason": f"{log_prefix}[{index}] timed out", "command": command, "stdout_log": str(last_stdout), "stderr_log": str(last_stderr), "hard_reject": True, } except (OSError, ValueError) as exc: last_stdout.write_text("", encoding="utf-8") last_stderr.write_text(str(exc), encoding="utf-8") return { "ok": False, "status": "pre_command_crash", "reason": str(exc), "command": command, "stdout_log": str(last_stdout), "stderr_log": str(last_stderr), "hard_reject": True, } last_stdout.write_text(str(completed.stdout or ""), encoding="utf-8") last_stderr.write_text(str(completed.stderr or ""), encoding="utf-8") if completed.returncode != 0: return { "ok": False, "status": "pre_command_failed", "reason": f"{log_prefix}[{index}] exited {completed.returncode}", "return_code": int(completed.returncode), "command": command, "stdout_log": str(last_stdout), "stderr_log": str(last_stderr), "hard_reject": True, } post_changed = _changed_signatures(project_root) new_changes = sorted(post_changed - baseline_changed) if new_changes: rendered = [ {"status": status, "path": path} for status, path in new_changes if path ] return { "ok": False, "status": "pre_command_side_effect", "reason": "pre_commands left tracked repository changes", "tracked_changes": rendered, "stdout_log": str(last_stdout), "stderr_log": str(last_stderr), "hard_reject": True, } return None
def _parse_json_from_stdout(stdout_text: str) -> dict[str, Any] | None: text = str(stdout_text or "").strip() if not text: return None try: payload = json.loads(text) except json.JSONDecodeError: lines = [line.strip() for line in text.splitlines() if line.strip()] if not lines: return None try: payload = json.loads(lines[-1]) except json.JSONDecodeError: return None return payload if isinstance(payload, dict) else None def _normalize_milestone(raw: object) -> dict[str, Any] | None: if not isinstance(raw, dict): return None milestone_id = str(raw.get("id") or raw.get("name") or "").strip() if not milestone_id: milestone_id = "milestone" status = str(raw.get("status") or "").strip().lower() if not status: status = "pass" if bool(raw.get("ok", False)) else "unknown" score_raw = raw.get("score") score = 0.0 if isinstance(score_raw, (int, float)) and not isinstance(score_raw, bool): score = float(score_raw) return { "id": milestone_id, "status": status, "score": score, "notes": str(raw.get("notes") or raw.get("reason") or "").strip(), } def _normalize_validation_payload( payload: dict[str, Any], *, fallback_ok: bool, fallback_status: str, fallback_score: float, ) -> dict[str, Any]: score_raw = payload.get("score") if isinstance(score_raw, (int, float)) and not isinstance(score_raw, bool): score = float(score_raw) else: score = fallback_score milestones: list[dict[str, Any]] = [] raw_milestones = payload.get("milestones") if isinstance(raw_milestones, list): for item in raw_milestones: milestone = _normalize_milestone(item) if milestone is not None: milestones.append(milestone) cases = payload.get("cases") normalized: dict[str, Any] = { "ok": bool(payload.get("ok", fallback_ok)), "status": str(payload.get("status") or fallback_status), "score": score, "complete": bool(payload.get("complete", False)), "build_ok": bool(payload.get("build_ok", fallback_ok)), "api_ok": bool(payload.get("api_ok", fallback_ok)), "scientific_checks_ok": payload.get("scientific_checks_ok", fallback_ok), "milestones": milestones, "cases": cases if isinstance(cases, list) else [], "observables": ( copy.deepcopy(payload.get("observables")) if isinstance(payload.get("observables"), dict) else {} ), "errors": ( [str(item) for item in payload.get("errors")] if isinstance(payload.get("errors"), list) else [] ), } return normalized
[docs] def run_validation_suite( project_root: Path, *, contract_payload: dict[str, Any], contract_path: Path, run_dir: Path, timeout_seconds: int, ) -> dict[str, Any]: """Run controller pre-commands and progressive validation commands.""" run_dir.mkdir(parents=True, exist_ok=True) pre_failure = run_pre_commands( project_root, commands=implement_contract.pre_commands(contract_payload, "controller"), run_dir=run_dir / "controller_pre_commands", timeout_seconds=timeout_seconds, log_prefix="controller_pre_command", contract_path=contract_path, ) if pre_failure is not None: result = _normalize_validation_payload( pre_failure, fallback_ok=False, fallback_status=str(pre_failure.get("status") or "pre_command_failed"), fallback_score=0.0, ) result["hard_reject"] = True _write_validation_result(run_dir, result) return result commands = implement_contract.validation_commands(contract_payload) if not commands: result = { "ok": True, "status": "no_validation_commands", "score": 0.0, "complete": False, "commands_ok": False, "build_ok": True, "api_ok": False, "scientific_checks_ok": "unknown", "milestones": [], "cases": [], "observables": {}, "errors": ["contract validation.commands is empty"], "hard_reject": False, } _write_validation_result(run_dir, result) return result command_results: list[dict[str, Any]] = [] pass_count = 0 json_payloads: list[dict[str, Any]] = [] for index, command_template in enumerate(commands, start=1): command = expand_command( command_template, project_root=project_root, run_dir=run_dir, contract_path=contract_path, ) stdout_path = run_dir / f"validation_{index}.stdout.log" stderr_path = run_dir / f"validation_{index}.stderr.log" started = time.perf_counter() try: completed = subprocess.run( command, cwd=str(project_root), text=True, capture_output=True, timeout=timeout_seconds, check=False, ) elapsed = max(0.0, time.perf_counter() - started) stdout_text = str(completed.stdout or "") stderr_text = str(completed.stderr or "") stdout_path.write_text(stdout_text, encoding="utf-8") stderr_path.write_text(stderr_text, encoding="utf-8") passed = completed.returncode == 0 if passed: pass_count += 1 parsed = _parse_json_from_stdout(stdout_text) if parsed is not None: json_payloads.append(parsed) command_results.append( { "id": f"validation-{index}", "command": command, "return_code": int(completed.returncode), "passed": passed, "elapsed_seconds": elapsed, "stdout_log": str(stdout_path), "stderr_log": str(stderr_path), "parsed_json": parsed if parsed is not None else {}, } ) except subprocess.TimeoutExpired as exc: stdout_path.write_text(str(exc.stdout or ""), encoding="utf-8") stderr_path.write_text(str(exc.stderr or ""), encoding="utf-8") command_results.append( { "id": f"validation-{index}", "command": command, "return_code": 124, "passed": False, "elapsed_seconds": float(timeout_seconds), "stdout_log": str(stdout_path), "stderr_log": str(stderr_path), "parsed_json": {}, "error": "timeout", } ) except (OSError, ValueError) as exc: stdout_path.write_text("", encoding="utf-8") stderr_path.write_text(str(exc), encoding="utf-8") command_results.append( { "id": f"validation-{index}", "command": command, "return_code": 1, "passed": False, "elapsed_seconds": 0.0, "stdout_log": str(stdout_path), "stderr_log": str(stderr_path), "parsed_json": {}, "error": str(exc), } ) if json_payloads: # Prefer the final structured payload; validation drivers can aggregate # their own detailed scoring more accurately than the generic runner. result = _normalize_validation_payload( json_payloads[-1], fallback_ok=pass_count == len(commands), fallback_status="ok" if pass_count == len(commands) else "partial", fallback_score=0.0, ) else: score = 100.0 * float(pass_count) / float(len(commands)) result = { "ok": pass_count == len(commands), "status": "ok" if pass_count == len(commands) else "partial", "score": score, "complete": pass_count == len(commands), "build_ok": pass_count > 0, "api_ok": pass_count == len(commands), "scientific_checks_ok": pass_count == len(commands), "milestones": [ { "id": item["id"], "status": "pass" if item["passed"] else "fail", "score": 100.0 / float(len(commands)) if item["passed"] else 0.0, "notes": "", } for item in command_results ], "cases": [], "observables": {}, "errors": [ str(item.get("error") or "") for item in command_results if not item.get("passed") and str(item.get("error") or "").strip() ], } result["commands_ok"] = pass_count == len(commands) result["command_results"] = command_results result["hard_reject"] = bool(result.get("hard_reject", False)) _write_validation_result(run_dir, result) return result
def _write_validation_result(run_dir: Path, result: dict[str, Any]) -> None: run_dir.mkdir(parents=True, exist_ok=True) (run_dir / "validation_result.json").write_text( json.dumps(result, indent=2, sort_keys=True) + "\n", encoding="utf-8", )
[docs] def validation_score(result: dict[str, Any] | None) -> float: if not isinstance(result, dict): return 0.0 raw = result.get("score") if isinstance(raw, (int, float)) and not isinstance(raw, bool): return float(raw) return 0.0
[docs] def validation_complete(result: dict[str, Any] | None) -> bool: return bool(result.get("complete")) if isinstance(result, dict) else False
def _validation_flag_true(result: dict[str, Any], key: str) -> bool: value = result.get(key) if isinstance(value, str): return value.strip().lower() in { "true", "ok", "pass", "passed", "complete", "completed", } return bool(value)
[docs] def final_integrity_ok(result: dict[str, Any] | None) -> bool: if not isinstance(result, dict) or not validation_complete(result): return False if "commands_ok" in result and not bool(result.get("commands_ok")): return False return ( _validation_flag_true(result, "ok") and _validation_flag_true(result, "build_ok") and _validation_flag_true(result, "api_ok") and _validation_flag_true(result, "scientific_checks_ok") )
_CONTROLLER_REVIEW_PASS_VERDICTS = { "pass", "passed", "ok", "satisfied", "complete", "completed", } def _controller_review_decision(review: dict[str, Any] | None) -> str | None: if not isinstance(review, dict): return None decision = str(review.get("decision") or "").strip().upper() return decision if decision in {"ACCEPTED", "REJECTED"} else None def _controller_review_requirement_evidence(requirement: dict[str, Any]) -> list[str]: evidence = requirement.get("evidence") if isinstance(evidence, str): evidence_text = evidence.strip() return [evidence_text] if evidence_text else [] if not isinstance(evidence, list): return [] return [str(item).strip() for item in evidence if str(item).strip()]
[docs] def controller_review_final_ok(review: dict[str, Any] | None) -> bool: """Return True when the controller independently proves final satisfaction.""" if not isinstance(review, dict): return False if _controller_review_decision(review) != "ACCEPTED": return False if not _validation_flag_true(review, "final_complete"): return False requirements = review.get("requirements") if not isinstance(requirements, list) or not requirements: return False checked_required = False for raw_requirement in requirements: if not isinstance(raw_requirement, dict): continue if not bool(raw_requirement.get("required", True)): continue checked_required = True verdict = ( str(raw_requirement.get("verdict") or raw_requirement.get("status") or "") .strip() .lower() ) if verdict not in _CONTROLLER_REVIEW_PASS_VERDICTS: return False if not _controller_review_requirement_evidence(raw_requirement): return False return checked_required
[docs] def acceptance_decision( *, contract_payload: dict[str, Any], incumbent_validation: dict[str, Any], candidate_validation: dict[str, Any], controller_decision: str | None, controller_review: dict[str, Any] | None = None, hard_reject: bool, hard_reason: str, ) -> dict[str, Any]: if hard_reject: return { "accepted": False, "final_complete": False, "status": "rejected", "reason": hard_reason or "hard guard rejection", } effective_controller_decision = ( _controller_review_decision(controller_review) or controller_decision ) if effective_controller_decision != "ACCEPTED": return { "accepted": False, "final_complete": False, "status": "rejected", "reason": "controller rejected candidate", } if bool(candidate_validation.get("hard_reject")): return { "accepted": False, "final_complete": False, "status": "rejected", "reason": str( candidate_validation.get("reason") or "validation hard reject" ), } scoring = implement_contract.scoring_config(contract_payload) min_improvement = scoring.get("min_score_improvement", 0.0) try: min_delta = max(0.0, float(min_improvement)) except (TypeError, ValueError): min_delta = 0.0 old_score = validation_score(incumbent_validation) new_score = validation_score(candidate_validation) validation_reports_complete = validation_complete(candidate_validation) if validation_reports_complete and not final_integrity_ok(candidate_validation): return { "accepted": False, "final_complete": False, "status": "rejected", "reason": ( "candidate reported complete=true without ok/build_ok/api_ok/" "scientific_checks_ok all passing" ), } semantic_final_ok = controller_review_final_ok(controller_review) complete = validation_reports_complete and semantic_final_ok improved = new_score > old_score + min_delta if complete or improved: if validation_reports_complete and not semantic_final_ok and improved: reason = ( "validation reported complete, but controller review did not " "provide structured final target-satisfaction evidence; " f"accepting partial progress ({old_score:.6g} -> {new_score:.6g})" ) else: reason = ( "candidate satisfies final done criteria and controller review" if complete else f"score improved from {old_score:.6g} to {new_score:.6g}" ) return { "accepted": True, "final_complete": complete, "status": "complete" if complete else "accepted_partial", "reason": reason, } if validation_reports_complete and not semantic_final_ok: return { "accepted": False, "final_complete": False, "status": "rejected", "reason": ( "validation reported complete, but controller review did not " "provide structured final target-satisfaction evidence" ), } return { "accepted": False, "final_complete": False, "status": "rejected", "reason": f"score did not improve ({old_score:.6g} -> {new_score:.6g})", }