Source code for fermilink.optimize.state

from __future__ import annotations

import json
from datetime import datetime, timezone
import os
from pathlib import Path
from typing import Any


OPTIMIZE_DIRNAME = ".fermilink-optimize"
STATE_FILENAME = "state.json"
RESULTS_FILENAME = "results.tsv"
MEMORY_FILENAME = "memory.md"
WORKER_MEMORY_FILENAME = "worker_memory.md"
WORKER_BENCHMARK_FILENAME = "benchmark.worker.yaml"
PROGRAM_FILENAME = "program.md"
RUNS_DIRNAME = "runs"
AUTOGEN_DIRNAME = "autogen"
RUN_LOCK_FILENAME = "run.lock.json"
QUICK_MANIFEST_FILENAME = "quick_mode.json"
QUICK_BENCHMARK_FILENAME = "benchmark.yaml"
QUICK_RUNNER_FILENAME = "benchmark_runner.py"
QUICK_SUBMIT_FILENAME = "submit_poll_launcher.py"
QUICK_SETUP_FILENAME = "setup_env.sh"
QUICK_RUN_SCRIPT_FILENAME = "run_optimize.sh"
GOAL_MANIFEST_FILENAME = "goal_mode.json"
GOAL_ANALYSIS_FILENAME = "goal_analysis.json"
GOAL_COPY_FILENAME = "goal.md"
GOAL_BENCHMARK_FILENAME = "benchmark.yaml"
GOAL_RUNNER_FILENAME = "benchmark_runner.py"
GOAL_SUBMIT_FILENAME = "submit_poll_launcher.py"
GOAL_SETUP_FILENAME = "setup_env.sh"
GOAL_RUN_SCRIPT_FILENAME = "run_optimize.sh"
INPUTS_DIRNAME = "inputs"
GOAL_INPUTS_ALL_DIRNAME = "all"
GOAL_INPUTS_WORKER_DIRNAME = "worker"
GOAL_INPUTS_MANIFEST_FILENAME = "goal_inputs.json"

RESULTS_HEADER = "iteration\tcommit\tstatus\tprimary_metric_name\tprimary_metric_value\tdescription\n"


def _sanitize_cell(value: object) -> str:
    return " ".join(str(value or "").replace("\t", " ").split()).strip()


[docs] def utc_now_z() -> str: return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
[docs] def optimize_root(project_root: Path) -> Path: return project_root / OPTIMIZE_DIRNAME
[docs] def state_path(project_root: Path) -> Path: return optimize_root(project_root) / STATE_FILENAME
[docs] def results_path(project_root: Path) -> Path: return optimize_root(project_root) / RESULTS_FILENAME
[docs] def memory_path(project_root: Path) -> Path: return optimize_root(project_root) / MEMORY_FILENAME
[docs] def worker_memory_path(project_root: Path) -> Path: return optimize_root(project_root) / WORKER_MEMORY_FILENAME
[docs] def worker_benchmark_path(project_root: Path) -> Path: return optimize_root(project_root) / WORKER_BENCHMARK_FILENAME
[docs] def runs_root(project_root: Path) -> Path: return optimize_root(project_root) / RUNS_DIRNAME
[docs] def autogen_root(project_root: Path) -> Path: return optimize_root(project_root) / AUTOGEN_DIRNAME
[docs] def run_lock_path(project_root: Path) -> Path: return optimize_root(project_root) / RUN_LOCK_FILENAME
[docs] def quick_manifest_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_MANIFEST_FILENAME
[docs] def quick_benchmark_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_BENCHMARK_FILENAME
[docs] def quick_runner_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_RUNNER_FILENAME
[docs] def quick_submit_launcher_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_SUBMIT_FILENAME
[docs] def quick_setup_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_SETUP_FILENAME
[docs] def quick_run_script_path(project_root: Path) -> Path: return autogen_root(project_root) / QUICK_RUN_SCRIPT_FILENAME
[docs] def goal_manifest_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_MANIFEST_FILENAME
[docs] def goal_analysis_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_ANALYSIS_FILENAME
[docs] def goal_copy_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_COPY_FILENAME
[docs] def goal_benchmark_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_BENCHMARK_FILENAME
[docs] def goal_runner_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_RUNNER_FILENAME
[docs] def goal_submit_launcher_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_SUBMIT_FILENAME
[docs] def goal_setup_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_SETUP_FILENAME
[docs] def goal_run_script_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_RUN_SCRIPT_FILENAME
[docs] def inputs_root(project_root: Path) -> Path: return optimize_root(project_root) / INPUTS_DIRNAME
[docs] def goal_inputs_all_root(project_root: Path) -> Path: return inputs_root(project_root) / GOAL_INPUTS_ALL_DIRNAME
[docs] def goal_inputs_worker_root(project_root: Path) -> Path: return inputs_root(project_root) / GOAL_INPUTS_WORKER_DIRNAME
[docs] def goal_inputs_manifest_path(project_root: Path) -> Path: return autogen_root(project_root) / GOAL_INPUTS_MANIFEST_FILENAME
[docs] def default_program_path(project_root: Path) -> Path: return optimize_root(project_root) / PROGRAM_FILENAME
[docs] def ensure_optimize_root(project_root: Path) -> Path: root = optimize_root(project_root) root.mkdir(parents=True, exist_ok=True) runs_root(project_root).mkdir(parents=True, exist_ok=True) return root
[docs] def ensure_autogen_root(project_root: Path) -> Path: root = autogen_root(project_root) root.mkdir(parents=True, exist_ok=True) return root
[docs] def safe_relative(path: Path, root: Path) -> str: try: return str(path.resolve().relative_to(root.resolve())).replace("\\", "/") except ValueError: return str(path.resolve())
[docs] def ensure_program_file(path: Path, *, content: str) -> bool: if path.exists(): return False path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return True
[docs] def ensure_results_file(path: Path) -> bool: if path.exists(): return False path.parent.mkdir(parents=True, exist_ok=True) path.write_text(RESULTS_HEADER, encoding="utf-8") return True
[docs] def write_json_file(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) temp_path = path.with_suffix(path.suffix + ".tmp") temp_path.write_text( json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) temp_path.replace(path)
[docs] def load_json_file(path: Path) -> dict[str, Any] | None: if not path.is_file(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None return payload
[docs] def write_run_lock(path: Path, payload: dict[str, Any]) -> None: write_json_file(path, payload)
[docs] def load_run_lock(path: Path) -> dict[str, Any] | None: return load_json_file(path)
[docs] def clear_run_lock(path: Path) -> None: try: path.unlink(missing_ok=True) except OSError: pass
[docs] def pid_is_running(pid: int) -> bool: if pid <= 0: return False try: os.kill(pid, 0) except OSError: return False return True
[docs] def ensure_executable(path: Path) -> None: try: mode = path.stat().st_mode except OSError: return if mode & 0o111: return try: path.chmod(mode | 0o111) except OSError: return
[docs] def ensure_memory_file( path: Path, *, package_id: str, benchmark_id: str, benchmark_rel: str, optimize_branch: str, ) -> bool: if path.exists(): return False started = utc_now_z() path.parent.mkdir(parents=True, exist_ok=True) initial = ( "# FermiLink Optimize Memory\n" "\n" f"- package_id: {package_id}\n" f"- benchmark_id: {benchmark_id}\n" f"- benchmark_path: {benchmark_rel}\n" f"- optimize_branch: {optimize_branch}\n" f"- started_at_utc: {started}\n" f"- last_updated_utc: {started}\n" "\n" "## Short-Term Memory (Operational)\n" "### Current objective\n" "- Improve benchmark performance while preserving correctness.\n" "### Latest accepted commit\n" "- commit: pending\n" "- primary_metric: pending\n" "### Progress log\n" "- Campaign initialized.\n" "\n" "## Long-Term Memory (Persistent)\n" "### Benchmark contract\n" f"- Benchmark file: `{benchmark_rel}`\n" "### Accepted experiments\n" "- none yet\n" "### Rejected patterns\n" "- none yet\n" "### Open hypotheses\n" "- Review `skills/` and recent results before proposing the next change.\n" ) path.write_text(initial, encoding="utf-8") return True
[docs] def reset_worker_memory_file( path: Path, *, package_id: str, benchmark_id: str, benchmark_rel: str, program_rel: str, controller_memory_rel: str, results_rel: str, worker_iteration: int, ) -> None: now = utc_now_z() path.parent.mkdir(parents=True, exist_ok=True) initial = ( "# FermiLink Optimize Worker Memory\n" "\n" f"- package_id: {package_id}\n" f"- benchmark_id: {benchmark_id}\n" f"- benchmark_path: {benchmark_rel}\n" f"- program_path: {program_rel}\n" f"- controller_memory_path: {controller_memory_rel}\n" f"- results_path: {results_rel}\n" f"- worker_iteration: {worker_iteration}\n" f"- reset_at_utc: {now}\n" "\n" "This file is reset at the start of each outer optimize iteration and " "archived under `.fermilink-optimize/runs/iter_XXXX/worker_memory.md`.\n" "\n" "## Short-Term Memory (Operational)\n" "### Current objective\n" "- Prepare exactly one candidate that is ready for authoritative benchmark evaluation.\n" "### Plan\n" "- Read benchmark/program/controller memory/results/skills.\n" "- Inspect the current implementation and pick one optimization hypothesis.\n" "- Apply focused edits only within benchmark-approved paths.\n" "- Run quick local checks or launch/poll long-running worker jobs if needed.\n" "- Update this memory with progress and finish only when the candidate is benchmark-ready.\n" "### Progress log\n" "- Worker iteration initialized.\n" "\n" "## Tactical Notes\n" "### Job tracking\n" "- none yet\n" "### Candidate summary\n" "- pending\n" "### Debug notes\n" "- none yet\n" ) path.write_text(initial, encoding="utf-8")
[docs] def archive_worker_memory(source_path: Path, run_dir: Path) -> Path | None: if not source_path.is_file(): return None run_dir.mkdir(parents=True, exist_ok=True) target_path = run_dir / WORKER_MEMORY_FILENAME target_path.write_text(source_path.read_text(encoding="utf-8"), encoding="utf-8") return target_path
[docs] def load_state(path: Path) -> dict[str, Any] | None: return load_json_file(path)
[docs] def write_state(path: Path, payload: dict[str, Any]) -> None: write_json_file(path, payload)
[docs] def append_result( path: Path, *, iteration: int, commit: str, status: str, primary_metric_name: str, primary_metric_value: float | int | str, description: str, ) -> None: rendered_value = ( f"{float(primary_metric_value):.12g}" if isinstance(primary_metric_value, (int, float)) else str(primary_metric_value) ) row = ( f"{int(iteration)}\t{commit}\t{status}\t{primary_metric_name}\t" f"{rendered_value}\t{_sanitize_cell(description)}\n" ) with path.open("a", encoding="utf-8") as handle: handle.write(row)
[docs] def recent_results_text(path: Path, *, limit: int = 8) -> str: try: lines = path.read_text(encoding="utf-8").splitlines() except OSError: return "" if not lines: return "" return "\n".join(lines[-limit:])
def _find_section_bounds(lines: list[str], heading: str) -> tuple[int, int] | None: try: start = lines.index(heading) except ValueError: return None end = len(lines) for index in range(start + 1, len(lines)): line = lines[index] if line.startswith("## ") or line.startswith("### "): end = index break return start, end def _set_section_lines(content: str, heading: str, new_lines: list[str]) -> str: lines = content.splitlines() bounds = _find_section_bounds(lines, heading) if bounds is None: if lines and lines[-1] != "": lines.append("") lines.append(heading) lines.extend(new_lines) return "\n".join(lines) + "\n" start, end = bounds updated = lines[: start + 1] + new_lines + lines[end:] return "\n".join(updated) + "\n" def _append_section_line(content: str, heading: str, entry: str) -> str: lines = content.splitlines() bounds = _find_section_bounds(lines, heading) if bounds is None: if lines and lines[-1] != "": lines.append("") lines.append(heading) lines.append(entry) return "\n".join(lines) + "\n" start, end = bounds body = lines[start + 1 : end] cleaned = [ line for line in body if line.strip() not in {"- none yet", "- Campaign initialized."} ] cleaned.append(entry) updated = lines[: start + 1] + cleaned + lines[end:] return "\n".join(updated) + "\n" def _replace_metadata_line(content: str, prefix: str, value: str) -> str: lines = content.splitlines() target = f"{prefix}{value}" for index, line in enumerate(lines): if line.startswith(prefix): lines[index] = target return "\n".join(lines) + "\n" if lines and lines[-1] != "": lines.append("") lines.append(target) return "\n".join(lines) + "\n"
[docs] def record_campaign_event( path: Path, *, commit: str, primary_metric_name: str, primary_metric_value: float | int | str, status: str, description: str, ) -> None: try: content = path.read_text(encoding="utf-8") except OSError: return now = utc_now_z() metric_text = ( f"{float(primary_metric_value):.12g}" if isinstance(primary_metric_value, (int, float)) else str(primary_metric_value) ) if status in {"baseline", "accepted"}: latest_lines = [ f"- commit: {commit}", f"- primary_metric: {primary_metric_name}={metric_text}", f"- status: {status}", ] content = _set_section_lines( content, "### Latest accepted commit", latest_lines ) content = _append_section_line( content, "### Progress log", ( f"- [{now}] {status}: {_sanitize_cell(description)} " f"({commit}, {primary_metric_name}={metric_text})" ), ) target_heading = ( "### Accepted experiments" if status in {"baseline", "accepted"} else "### Rejected patterns" ) content = _append_section_line( content, target_heading, ( f"- [{now}] {_sanitize_cell(description)} " f"({commit}, {primary_metric_name}={metric_text})" ), ) content = _replace_metadata_line(content, "- last_updated_utc: ", now) path.write_text(content, encoding="utf-8")