Source code for fermilink.implement.git

from __future__ import annotations

import hashlib
import re
import shutil
from pathlib import Path

from fermilink.optimize import git as optimize_git


WORKER_BRANCH_PREFIX = "fermilink-implement-worker/"
WORKER_WORKTREE_STORAGE_DIRNAME = "fermilink-implement-worktrees"
WORKER_GIT_HIDDEN_BASENAME = optimize_git.WORKER_GIT_HIDDEN_BASENAME

run_git = optimize_git.run_git
current_branch = optimize_git.current_branch
head_sha = optimize_git.head_sha
branch_exists = optimize_git.branch_exists
ensure_clean_repo = optimize_git.ensure_clean_repo
ensure_local_excludes = optimize_git.ensure_local_excludes
list_changed_paths = optimize_git.list_changed_paths
list_untracked_paths = optimize_git.list_untracked_paths
cleanup_paths = optimize_git.cleanup_paths
commit_paths = optimize_git.commit_paths
reset_to_commit = optimize_git.reset_to_commit
with_worker_git_disabled = optimize_git.with_worker_git_disabled
restore_worker_git_metadata = optimize_git.restore_worker_git_metadata
inspect_worker_git_metadata = optimize_git.inspect_worker_git_metadata
reset_worker_to_commit = optimize_git.reset_worker_to_commit
clean_worker_untracked = optimize_git.clean_worker_untracked
temporary_implement_agents = optimize_git.temporary_optimize_agents
cleanup_stale_temporary_implement_agents = (
    optimize_git.cleanup_stale_temporary_optimize_agents
)


def _cli():
    from fermilink import cli

    return cli


def _git_toplevel(repo_dir: Path) -> Path:
    completed = run_git(repo_dir, ["rev-parse", "--show-toplevel"])
    raw = str(completed.stdout or "").strip()
    if not raw:
        return repo_dir.resolve()
    return Path(raw).resolve()


def _worker_key(controller_branch: str) -> str:
    branch = str(controller_branch or "").strip() or "implement"
    slug = re.sub(r"[^A-Za-z0-9._-]+", "-", branch).strip("-._")
    if not slug:
        slug = "implement"
    digest = hashlib.sha1(branch.encode("utf-8")).hexdigest()[:12]
    return f"{slug[:48]}-{digest}"


[docs] def worker_branch_name(controller_branch: str) -> str: return f"{WORKER_BRANCH_PREFIX}{_worker_key(controller_branch)}"
[docs] def worker_storage_root(repo_dir: Path) -> Path: root = _git_toplevel(repo_dir) repo_name = re.sub(r"[^A-Za-z0-9._-]+", "-", root.name).strip("-._") or "repo" return (root.parent / f".{repo_name}-{WORKER_WORKTREE_STORAGE_DIRNAME}").resolve()
[docs] def worker_worktree_path(repo_dir: Path, *, controller_branch: str) -> Path: return worker_storage_root(repo_dir) / _worker_key(controller_branch)
[docs] def checkout_implement_branch( repo_dir: Path, *, branch_name: str, ) -> dict[str, str | bool | None]: original_branch = current_branch(repo_dir) created = False if original_branch != branch_name: if branch_exists(repo_dir, branch_name): run_git(repo_dir, ["checkout", branch_name]) else: run_git(repo_dir, ["checkout", "-b", branch_name]) created = True return { "original_branch": original_branch, "active_branch": current_branch(repo_dir), "created": created, }
def _list_worktrees(repo_dir: Path) -> list[dict[str, str]]: completed = run_git(repo_dir, ["worktree", "list", "--porcelain"]) entries: list[dict[str, str]] = [] current: dict[str, str] = {} for raw_line in str(completed.stdout or "").splitlines(): if not raw_line.strip(): if current: entries.append(current) current = {} continue if raw_line.startswith("worktree "): if current: entries.append(current) current = {"worktree": raw_line.split(" ", 1)[1].strip()} elif raw_line.startswith("HEAD "): current["head"] = raw_line.split(" ", 1)[1].strip() elif raw_line.startswith("branch "): current["branch"] = raw_line.split(" ", 1)[1].strip() if current: entries.append(current) return entries def _existing_worktree_for_branch(repo_dir: Path, *, branch_name: str) -> Path | None: branch_ref = f"refs/heads/{branch_name}" for entry in _list_worktrees(repo_dir): if entry.get("branch") != branch_ref: continue raw = str(entry.get("worktree") or "").strip() if not raw: continue path = Path(raw).resolve() if path.exists(): return path return None def _remove_path(path: Path) -> None: if not (path.exists() or path.is_symlink()): return if path.is_dir() and not path.is_symlink(): shutil.rmtree(path) else: path.unlink(missing_ok=True)
[docs] def ensure_worker_worktree( repo_dir: Path, *, controller_branch: str, start_commit: str | None = None, ) -> dict[str, str | bool]: worker_branch = worker_branch_name(controller_branch) baseline_commit = str(start_commit or "").strip() or head_sha(repo_dir) if not branch_exists(repo_dir, worker_branch): run_git(repo_dir, ["branch", worker_branch, baseline_commit]) created_branch = True else: created_branch = False desired_root = worker_worktree_path(repo_dir, controller_branch=controller_branch) desired_root.parent.mkdir(parents=True, exist_ok=True) existing = _existing_worktree_for_branch(repo_dir, branch_name=worker_branch) if existing is not None and existing.resolve() != desired_root.resolve(): run_git( repo_dir, ["worktree", "remove", "--force", str(existing)], check=False, ) existing = None if existing is None: if desired_root.exists() or desired_root.is_symlink(): _remove_path(desired_root) run_git( repo_dir, ["worktree", "add", "--force", str(desired_root), worker_branch], ) created_worktree = True worker_root = desired_root else: worker_root = existing created_worktree = False restore_worker_git_metadata(worker_root) return { "worker_branch": worker_branch, "worker_root": str(worker_root), "created_branch": created_branch, "created_worktree": created_worktree, }