from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fermilink.exploop.memory import append_to_memory_section
from fermilink.exploop.prompts import (
EXPLOOP_LEGACY_STATE_DIRNAME,
EXPLOOP_MEMORY_DIRNAME,
EXPLOOP_MEMORY_FILENAME,
EXPLOOP_STATE_DIRNAME,
EXPLOOP_STATE_FILENAME,
)
def _utc_now_z() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _mtime_utc(path: Path) -> str:
return (
datetime.fromtimestamp(path.stat().st_mtime, timezone.utc)
.isoformat()
.replace("+00:00", "Z")
)
[docs]
def state_path_for(repo_dir: Path) -> Path:
return repo_dir / EXPLOOP_STATE_DIRNAME / EXPLOOP_STATE_FILENAME
[docs]
def legacy_state_path_for(repo_dir: Path) -> Path:
return repo_dir / EXPLOOP_LEGACY_STATE_DIRNAME / EXPLOOP_STATE_FILENAME
[docs]
def load_state(repo_dir: Path) -> dict[str, Any]:
path = state_path_for(repo_dir)
if not path.is_file():
legacy_path = legacy_state_path_for(repo_dir)
if legacy_path.is_file():
path = legacy_path
if not path.is_file():
return {"version": 1, "known_artifacts": {}}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"version": 1, "known_artifacts": {}}
if not isinstance(payload, dict):
return {"version": 1, "known_artifacts": {}}
if not isinstance(payload.get("known_artifacts"), dict):
payload["known_artifacts"] = {}
payload.setdefault("version", 1)
return payload
[docs]
def save_state(repo_dir: Path, state: dict[str, Any]) -> None:
path = state_path_for(repo_dir)
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_suffix(path.suffix + ".tmp")
temp_path.write_text(
json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8"
)
temp_path.replace(path)
[docs]
def snapshot_project_artifacts(repo_dir: Path) -> dict[str, dict[str, Any]]:
projects_dir = repo_dir / EXPLOOP_MEMORY_DIRNAME
if not projects_dir.is_dir():
return {}
memory_rel = f"{EXPLOOP_MEMORY_DIRNAME}/{EXPLOOP_MEMORY_FILENAME}"
snapshot: dict[str, dict[str, Any]] = {}
for path in sorted(projects_dir.rglob("*")):
if not path.is_file():
continue
try:
rel = path.relative_to(repo_dir).as_posix()
except ValueError:
continue
if rel == memory_rel:
continue
try:
stat = path.stat()
except OSError:
continue
snapshot[rel] = {
"size_bytes": int(stat.st_size),
"mtime_ns": int(stat.st_mtime_ns),
"modified_utc": _mtime_utc(path),
}
return snapshot
[docs]
def detect_artifact_changes(
previous: dict[str, Any],
current: dict[str, dict[str, Any]],
) -> list[dict[str, Any]]:
known = previous.get("known_artifacts")
if not isinstance(known, dict):
known = {}
changes: list[dict[str, Any]] = []
for rel, record in sorted(current.items()):
old = known.get(rel)
if not isinstance(old, dict):
changes.append({"path": rel, "status": "new", **record})
continue
if old.get("size_bytes") != record.get("size_bytes") or old.get(
"mtime_ns"
) != record.get("mtime_ns"):
changes.append({"path": rel, "status": "modified", **record})
return changes
[docs]
def record_artifact_changes(repo_dir: Path, memory_path: Path) -> list[dict[str, Any]]:
"""Scan `projects/`, append new/modified artifacts to memory, and save state."""
state = load_state(repo_dir)
current = snapshot_project_artifacts(repo_dir)
first_scan = not bool(state.get("last_scan_at_utc"))
changes = [] if first_scan else detect_artifact_changes(state, current)
if changes:
lines = [
(
f"- {item['path']} | {item['status']} | "
f"{item['size_bytes']} bytes | modified {item['modified_utc']}"
)
for item in changes
]
append_to_memory_section(
memory_path,
heading="### Measurement data inventory",
lines=lines,
)
append_to_memory_section(
memory_path,
heading="### Progress log",
lines=[
f"- observed {len(changes)} new/modified measurement artifact(s) before agent turn"
],
)
state["known_artifacts"] = current
state["last_scan_at_utc"] = _utc_now_z()
save_state(repo_dir, state)
return changes