Source code for fermilink.implement.goal

"""Parse implementation goal markdown into a structured specification."""

from __future__ import annotations

import re
from typing import Any


_GOAL_SECTION_MARKERS = [
    "## package",
    "## target",
    "## editable scope",
    "## input api",
    "## desired outputs",
    "## baseline",
    "## reference",
    "## representative workloads",
    "## build",
    "## validation",
    "## done criteria",
    "# implementation goal",
]

_HEADING_RE = re.compile(r"^(#{1,3})\s+(.+)$")


[docs] def is_goal_markdown(text: str) -> bool: """Return True when markdown looks like an implementation goal.""" lowered = str(text or "").lower() return sum(1 for marker in _GOAL_SECTION_MARKERS if marker in lowered) >= 2
def _extract_sections(text: str) -> dict[str, str]: sections: dict[str, str] = {} current_heading = "" current_lines: list[str] = [] for line in str(text or "").splitlines(): match = _HEADING_RE.match(line) if match: if current_heading: sections[current_heading] = "\n".join(current_lines).strip() current_heading = match.group(2).strip().lower() current_lines = [] continue current_lines.append(line) if current_heading: sections[current_heading] = "\n".join(current_lines).strip() return sections def _text_section(sections: dict[str, str], *keys: str) -> str: for key in keys: value = str(sections.get(key, "") or "").strip() if value: return value return "" def _first_line(sections: dict[str, str], *keys: str) -> str: text = _text_section(sections, *keys) return text.splitlines()[0].strip() if text else "" def _list_section(sections: dict[str, str], *keys: str) -> list[str]: for key in keys: text = str(sections.get(key, "") or "").strip() if not text: continue items: list[str] = [] for line in text.splitlines(): stripped = line.strip() for prefix in ("- ", "* ", "+ "): if stripped.startswith(prefix): stripped = stripped[len(prefix) :] break stripped = stripped.strip() if stripped: items.append(stripped) if items: return items return [] def _code_blocks(sections: dict[str, str], *keys: str) -> list[str]: blocks: list[str] = [] for key in keys: text = str(sections.get(key, "") or "") current: list[str] | None = None for line in text.splitlines(): stripped = line.strip() if stripped.startswith("```"): if current is None: current = [] else: blocks.append("\n".join(current).strip()) current = None continue if current is not None: current.append(line) return [block for block in blocks if block] def _replace_or_append_section( text: str, *, headings: set[str], replacement_heading: str, replacement_body: str, ) -> str: lines = str(text or "").splitlines() output: list[str] = [] index = 0 replaced = False while index < len(lines): line = lines[index] match = _HEADING_RE.match(line) if not match or match.group(2).strip().lower() not in headings: output.append(line) index += 1 continue replaced = True level = len(match.group(1)) output.append(f"{'#' * level} {replacement_heading}") if replacement_body.strip(): output.append(replacement_body.strip()) index += 1 while index < len(lines): next_match = _HEADING_RE.match(lines[index]) if next_match and len(next_match.group(1)) <= level: break index += 1 if not replaced: if output and output[-1].strip(): output.append("") output.append(f"## {replacement_heading}") if replacement_body.strip(): output.append(replacement_body.strip()) return "\n".join(output).rstrip() + "\n"
[docs] def render_worker_visible_goal( text: str, *, worker_workloads: list[str], split_enabled: bool, ) -> str: """Render a worker copy of goal.md with controller-only details redacted.""" workload_body = "\n".join( f"- {item}" for item in worker_workloads if str(item).strip() ) if not workload_body: workload_body = ( "- No worker-visible representative workloads are specified. " "Controller-only holdouts are hidden." ) rendered = _replace_or_append_section( text, headings={"representative workloads", "workloads", "cases", "test cases"}, replacement_heading="Representative Workloads", replacement_body=workload_body, ) if split_enabled: rendered = _replace_or_append_section( rendered, headings={"validation", "checks"}, replacement_heading="Validation", replacement_body=( "Implementation-mode validation is generated in the YAML " "contract. Controller-only validation commands and held-out " "workloads may be hidden from the worker." ), ) return rendered
[docs] def parse_goal(text: str) -> dict[str, Any]: """Parse a goal.md file for implementation mode. Missing sections are represented as empty strings or lists so callers can decide whether to infer a contract or require explicit user input. """ sections = _extract_sections(text) baseline = _text_section( sections, "baseline", "reference", "baseline / reference", "baseline/reference", ) build_commands = _code_blocks(sections, "build", "setup", "install") legacy_pre_commands = _code_blocks(sections, "pre commands", "precommands") return { "raw_text": str(text or ""), "package": _first_line(sections, "package"), "target": _text_section(sections, "target"), "editable_scope": _list_section( sections, "editable scope", "scope", "editable paths", ), "input_api": _text_section(sections, "input api", "api", "input interface"), "desired_outputs": _list_section( sections, "desired outputs", "outputs", "expected outputs", ), "baseline_reference": baseline, "baseline_optional": not bool(baseline.strip()), "workloads": _list_section( sections, "representative workloads", "workloads", "cases", "test cases", ), "validation": _text_section(sections, "validation", "checks"), "validation_commands": _code_blocks(sections, "validation", "checks"), "build_commands": build_commands, "pre_commands": legacy_pre_commands, "done_criteria": _list_section( sections, "done criteria", "completion criteria", "acceptance criteria", ), "non_goals": _list_section(sections, "non goals", "non-goals"), "language": _first_line(sections, "language"), "notes": _text_section(sections, "notes"), }