slovo/benchmarks/runner.py

697 lines
26 KiB
Python

#!/usr/bin/env python3
"""Shared local benchmark runner for Glagol benchmark scaffolds."""
from __future__ import annotations
import argparse
import json
import os
import shutil
import statistics
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
TIMING_SCOPE = "local-machine comparison only"
TIMING_MODES = ["cold-process", "hot-loop"]
SUITE_NAME = "glagol-local-benchmark-suite"
LOCAL_TIMING_DISCLAIMER = (
"Local timing comparison only; not a published benchmark result and not a cross-machine performance claim."
)
REQUIRED_BENCHMARK_FILES = ["benchmark.json", "run.py", "slovo.toml", "src/main.slo"]
EXPECTED_IMPLEMENTATION_NAMES = ["slovo", "c", "rust", "python", "clojure", "common_lisp"]
@dataclass(frozen=True)
class BenchmarkSpec:
name: str
source_stem: str
loop_count: int
expected_checksum: str
stdin_text: str
hot_loop_count: int
hot_expected_checksum: str
hot_stdin_text: str
run_args: list[str]
@dataclass(frozen=True)
class RunParameters:
mode: str
loop_count: int
expected_checksum: str
stdin_text: str
base_loop_count: int
@dataclass(frozen=True)
class Implementation:
name: str
language: str
source: Path
build: Callable[[Path, BenchmarkSpec, argparse.Namespace], tuple[list[str], list[str] | None]]
run: Callable[[Path, BenchmarkSpec, argparse.Namespace], list[str]]
def main(root: Path, argv: list[str]) -> int:
if any(arg == "--suite-list" for arg in argv):
return main_suite(root, argv)
if not (root / "benchmark.json").is_file():
parser = argparse.ArgumentParser(description="Shared local Glagol benchmark runner.")
parser.add_argument("--suite-list", action="store_true", help="list suite metadata and exit")
parser.add_argument("--json", action="store_true", help="emit JSON for --suite-list")
parser.error("run from a benchmark run.py, or pass --suite-list at the benchmark suite root")
spec = read_spec(root)
implementations = available_implementations(root, spec)
parser = argparse.ArgumentParser(description=f"Run local {spec.name} timing comparisons.")
parser.add_argument("--list", action="store_true", help="print benchmark metadata and exit")
parser.add_argument("--json", action="store_true", help="emit JSON for --list or results")
parser.add_argument("--dry-run", action="store_true", help="print planned commands without running them")
parser.add_argument(
"--mode",
choices=TIMING_MODES,
default="cold-process",
help="cold-process measures one normal run; hot-loop uses an amplified loop count and normalized results",
)
parser.add_argument("--only", choices=[impl.name for impl in implementations], action="append")
parser.add_argument("--repeats", type=positive_int, default=5)
parser.add_argument("--warmups", type=non_negative_int, default=1)
parser.add_argument("--glagol", help="path to the glagol compiler binary")
parser.add_argument("--cc", help="path to the C compiler")
parser.add_argument("--clojure", help="path to the clojure command")
parser.add_argument("--clojure-jar", help="path to a clojure jar for `java -cp ... clojure.main`")
parser.add_argument("--sbcl", help="path to the SBCL executable for Common Lisp comparisons")
args = parser.parse_args(argv)
selected = select_implementations(implementations, args.only)
if args.list:
emit_list(root, spec, selected, args.json)
return 0
if args.dry_run:
emit_dry_run(root, spec, selected, args)
return 0
results = run_benchmarks(root, spec, selected, args)
emit_results(spec, results, args.json)
return 1 if any(result["status"] == "failed" for result in results) else 0
def main_suite(root: Path, argv: list[str]) -> int:
parser = argparse.ArgumentParser(description="List local Glagol benchmark suite metadata.")
parser.add_argument("--suite-list", action="store_true", help="list suite metadata and exit")
parser.add_argument("--json", action="store_true", help="emit JSON for suite metadata")
args = parser.parse_args(argv)
if not args.suite_list:
parser.error("pass --suite-list to list benchmark suite metadata")
suite_root = resolve_suite_root(root)
emit_suite_list(build_suite_catalog(suite_root), args.json)
return 0
def resolve_suite_root(root: Path) -> Path:
if (root / "benchmark.json").is_file():
return root.parent
return root
def read_spec(root: Path) -> BenchmarkSpec:
data = json.loads((root / "benchmark.json").read_text(encoding="utf-8"))
loop_count = int(data["loop_count"])
return BenchmarkSpec(
name=str(data["benchmark"]),
source_stem=str(data["source_stem"]),
loop_count=loop_count,
expected_checksum=str(data["expected_checksum"]),
stdin_text=str(data.get("stdin", f"{loop_count}\n")),
hot_loop_count=int(data.get("hot_loop_count", loop_count)),
hot_expected_checksum=str(data.get("hot_expected_checksum", data["expected_checksum"])),
hot_stdin_text=str(data.get("hot_stdin", f"{int(data.get('hot_loop_count', loop_count))}\n")),
run_args=[str(item) for item in data.get("run_args", [])],
)
def available_implementations(root: Path, spec: BenchmarkSpec) -> list[Implementation]:
candidates = [
Implementation("slovo", "Slovo", root / "src" / "main.slo", slovo_build, slovo_run),
Implementation("c", "C", root / "c" / f"{spec.source_stem}.c", c_build, c_run),
Implementation("rust", "Rust", root / "rust" / f"{spec.source_stem}.rs", rust_build, rust_run),
Implementation("python", "Python", root / "python" / f"{spec.source_stem}.py", python_build, python_run),
Implementation("clojure", "Clojure", root / "clojure" / f"{spec.source_stem}.clj", clojure_build, clojure_run),
Implementation(
"common_lisp",
"Common Lisp (SBCL)",
root / "common-lisp" / f"{spec.source_stem}.lisp",
common_lisp_build,
common_lisp_run,
),
]
return [impl for impl in candidates if impl.source.is_file()]
def slovo_compiler(root: Path, args: argparse.Namespace) -> str | None:
if args.glagol:
return args.glagol
env_path = os.environ.get("GLAGOL")
if env_path:
return env_path
candidate = root.parents[1] / "compiler" / "target" / "debug" / executable("glagol")
if candidate.is_file():
return str(candidate)
return shutil.which("glagol")
def executable(name: str) -> str:
return f"{name}.exe" if os.name == "nt" else name
def slovo_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
compiler = slovo_compiler(root, args)
if compiler is None:
return [], ["missing glagol compiler; set GLAGOL or pass --glagol"]
if os.environ.get("GLAGOL_CLANG") is None and shutil.which("clang") is None:
return [], ["missing clang for glagol build; set GLAGOL_CLANG"]
output = build_dir(root) / executable(f"slovo-{spec.name}")
return [compiler, "build", str(root), "-o", str(output)], None
def slovo_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
params = run_parameters(spec, _args.mode)
return [str(build_dir(root) / executable(f"slovo-{spec.name}")), *spec.run_args, str(params.loop_count)]
def c_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
compiler = args.cc or os.environ.get("CC") or first_available(["clang", "cc", "gcc"])
if compiler is None:
return [], ["missing C compiler; set CC or pass --cc"]
output = build_dir(root) / executable(f"c-{spec.name}")
return [compiler, "-O2", "-std=c11", str(root / "c" / f"{spec.source_stem}.c"), "-o", str(output)], None
def c_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
return [str(build_dir(root) / executable(f"c-{spec.name}")), *spec.run_args]
def rust_build(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
rustc = first_available(["rustc"])
if rustc is None:
return [], ["missing rustc"]
output = build_dir(root) / executable(f"rust-{spec.name}")
return [
rustc,
"-C",
"opt-level=3",
"-C",
"debuginfo=0",
str(root / "rust" / f"{spec.source_stem}.rs"),
"-o",
str(output),
], None
def rust_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
return [str(build_dir(root) / executable(f"rust-{spec.name}")), *spec.run_args]
def python_build(_root: Path, _spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
return [], None
def python_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
return [sys.executable, str(root / "python" / f"{spec.source_stem}.py"), *spec.run_args]
def clojure_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
if clojure_command(args) is None:
return [], ["missing clojure command; set CLOJURE, pass --clojure, or set CLOJURE_JAR"]
return [], None
def clojure_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]:
source = str(root / "clojure" / f"{spec.source_stem}.clj")
command = clojure_command(args)
assert command is not None
return [*command, source, *spec.run_args]
def clojure_command(args: argparse.Namespace) -> list[str] | None:
if args.clojure:
return [args.clojure]
env_path = os.environ.get("CLOJURE")
if env_path:
return [env_path]
found = shutil.which("clojure")
if found:
return [found]
jar = args.clojure_jar or os.environ.get("CLOJURE_JAR")
java = shutil.which("java")
if jar and java:
return [java, "-cp", jar, "clojure.main"]
return None
def common_lisp_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
if sbcl_command(args) is None:
return [], ["missing SBCL; set SBCL or pass --sbcl"]
return [], None
def common_lisp_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]:
sbcl = sbcl_command(args)
assert sbcl is not None
return [
sbcl,
"--noinform",
"--disable-debugger",
"--script",
str(root / "common-lisp" / f"{spec.source_stem}.lisp"),
*spec.run_args,
]
def sbcl_command(args: argparse.Namespace) -> str | None:
if args.sbcl:
return args.sbcl
env_path = os.environ.get("SBCL")
if env_path:
return env_path
return shutil.which("sbcl")
def build_dir(root: Path) -> Path:
return root / "build"
def positive_int(value: str) -> int:
parsed = int(value)
if parsed <= 0:
raise argparse.ArgumentTypeError("value must be greater than zero")
return parsed
def non_negative_int(value: str) -> int:
parsed = int(value)
if parsed < 0:
raise argparse.ArgumentTypeError("value must be zero or greater")
return parsed
def select_implementations(implementations: list[Implementation], names: list[str] | None) -> list[Implementation]:
if not names:
return implementations
selected_names = set(names)
return [impl for impl in implementations if impl.name in selected_names]
def build_suite_catalog(suite_root: Path) -> dict[str, object]:
benchmarks: list[dict[str, object]] = []
implementation_slot_count = 0
missing_required_files: list[str] = []
missing_implementation_slots: list[str] = []
for root in suite_benchmark_roots(suite_root):
spec = read_spec(root)
implementations = available_implementations(root, spec)
implementation_slot_count += len(implementations)
benchmark = suite_benchmark_metadata(suite_root, root, spec, implementations)
benchmarks.append(benchmark)
directory = str(benchmark["directory"])
for required_file in benchmark["required_files"]:
assert isinstance(required_file, dict)
if required_file["status"] != "present":
missing_required_files.append(f"{directory}/{required_file['path']}")
present_implementations = {
str(implementation["name"])
for implementation in benchmark["implementation_slots"]
if isinstance(implementation, dict)
}
for expected in EXPECTED_IMPLEMENTATION_NAMES:
if expected not in present_implementations:
missing_implementation_slots.append(f"{directory}:{expected}")
return {
"suite": SUITE_NAME,
"timing_scope": TIMING_SCOPE,
"timing_modes": TIMING_MODES,
"timing_disclaimer": LOCAL_TIMING_DISCLAIMER,
"benchmark_count": len(benchmarks),
"benchmarks": benchmarks,
"verification": {
"status": "ok" if not missing_required_files and not missing_implementation_slots else "incomplete",
"benchmark_metadata_files": len(benchmarks),
"required_files": len(benchmarks) * len(REQUIRED_BENCHMARK_FILES),
"missing_required_files": missing_required_files,
"implementation_slots": implementation_slot_count,
"expected_implementation_slots": len(benchmarks) * len(EXPECTED_IMPLEMENTATION_NAMES),
"missing_implementation_slots": missing_implementation_slots,
},
}
def suite_benchmark_roots(suite_root: Path) -> list[Path]:
return sorted(
[path for path in suite_root.iterdir() if path.is_dir() and (path / "benchmark.json").is_file()],
key=lambda path: path.name,
)
def suite_benchmark_metadata(
suite_root: Path,
root: Path,
spec: BenchmarkSpec,
implementations: list[Implementation],
) -> dict[str, object]:
return {
"name": spec.name,
"directory": str(root.relative_to(suite_root)),
"source_stem": spec.source_stem,
"timing_modes": TIMING_MODES,
"loop_count_source": "stdin",
"loop_count": spec.loop_count,
"hot_loop_count": spec.hot_loop_count,
"expected_checksum": spec.expected_checksum,
"hot_expected_checksum": spec.hot_expected_checksum,
"required_files": [
{
"path": relative,
"status": "present" if (root / relative).is_file() else "missing",
}
for relative in REQUIRED_BENCHMARK_FILES
],
"checksum_metadata": {
"cold_process": {
"expected_checksum": spec.expected_checksum,
"stdin": spec.stdin_text,
},
"hot_loop": {
"expected_checksum": spec.hot_expected_checksum,
"stdin": spec.hot_stdin_text,
},
},
"run_args": spec.run_args,
"implementation_slots": [
{
"name": impl.name,
"language": impl.language,
"source": str(impl.source.relative_to(suite_root)),
}
for impl in implementations
],
}
def emit_suite_list(metadata: dict[str, object], as_json: bool) -> None:
if as_json:
print(json.dumps(metadata, indent=2, sort_keys=True))
return
print(f"{metadata['suite']}: {metadata['timing_scope']}")
print(str(metadata["timing_disclaimer"]))
print(f"benchmark_count={metadata['benchmark_count']}")
print(f"timing_modes={','.join(TIMING_MODES)}")
verification = metadata["verification"]
assert isinstance(verification, dict)
print(f"verification_status={verification['status']}")
print(f"required_files={verification['required_files']}")
print(f"implementation_slots={verification['implementation_slots']}")
print("benchmarks:")
for benchmark in metadata["benchmarks"]:
assert isinstance(benchmark, dict)
print(
" {name} ({directory}): loop_count={loop_count} hot_loop_count={hot_loop_count} "
"expected_checksum={expected_checksum} hot_expected_checksum={hot_expected_checksum}".format(
name=benchmark["name"],
directory=benchmark["directory"],
loop_count=benchmark["loop_count"],
hot_loop_count=benchmark["hot_loop_count"],
expected_checksum=benchmark["expected_checksum"],
hot_expected_checksum=benchmark["hot_expected_checksum"],
)
)
print(" required_files:")
for required_file in benchmark["required_files"]:
assert isinstance(required_file, dict)
print(f" {required_file['path']}: {required_file['status']}")
print(" implementations:")
for implementation in benchmark["implementation_slots"]:
assert isinstance(implementation, dict)
print(f" {implementation['name']}: {implementation['language']} ({implementation['source']})")
def emit_list(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], as_json: bool) -> None:
metadata = {
"benchmark": spec.name,
"loop_count": spec.loop_count,
"hot_loop_count": spec.hot_loop_count,
"expected_checksum": spec.expected_checksum,
"hot_expected_checksum": spec.hot_expected_checksum,
"timing_scope": TIMING_SCOPE,
"timing_modes": TIMING_MODES,
"loop_count_source": "stdin",
"run_args": spec.run_args,
"implementations": [
{"name": impl.name, "language": impl.language, "source": str(impl.source.relative_to(root))}
for impl in implementations
],
}
if as_json:
print(json.dumps(metadata, indent=2, sort_keys=True))
return
print(f"{spec.name}: {TIMING_SCOPE}")
print(f"loop_count={spec.loop_count}")
print(f"hot_loop_count={spec.hot_loop_count}")
print("loop_count_source=stdin")
if spec.run_args:
print(f"run_args={' '.join(spec.run_args)}")
print(f"expected_checksum={spec.expected_checksum}")
print("implementations:")
for impl in implementations:
print(f" {impl.name}: {impl.language} ({impl.source.relative_to(root)})")
def emit_dry_run(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> None:
params = run_parameters(spec, args.mode)
print(f"{spec.name}: {TIMING_SCOPE}")
print(f"mode={params.mode}")
print(f"loop_count={params.loop_count}")
print(f"expected_checksum={params.expected_checksum}")
for impl in implementations:
build_command, skip_reasons = impl.build(root, spec, args)
print(f"{impl.name}:")
if skip_reasons:
print(f" skip: {'; '.join(skip_reasons)}")
continue
if build_command:
print(f" build: {format_command(build_command)}")
else:
print(" build: none")
print(f" stdin: {params.stdin_text.rstrip()}")
print(f" run: {format_command(impl.run(root, spec, args))}")
def run_benchmarks(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> list[dict[str, object]]:
build_dir(root).mkdir(exist_ok=True)
return [run_one(root, spec, impl, args) for impl in implementations]
def run_one(root: Path, spec: BenchmarkSpec, impl: Implementation, args: argparse.Namespace) -> dict[str, object]:
params = run_parameters(spec, args.mode)
build_command, skip_reasons = impl.build(root, spec, args)
if skip_reasons:
return skipped_result(impl, skip_reasons)
if build_command:
build = run_command(build_command)
if build.returncode != 0:
return failed_result(impl, "build failed", build)
run_command_line = impl.run(root, spec, args)
for _ in range(args.warmups):
warmup = run_command(run_command_line, params.stdin_text)
if not run_succeeded_for_params(warmup, params):
return failed_result(impl, "warmup failed", warmup)
timings: list[int] = []
for _ in range(args.repeats):
start = time.perf_counter_ns()
run = run_command(run_command_line, params.stdin_text)
elapsed = time.perf_counter_ns() - start
if not run_succeeded_for_params(run, params):
return failed_result(impl, "run failed", run)
timings.append(elapsed)
min_ms = ns_to_ms(min(timings))
median_ms = ns_to_ms(int(statistics.median(timings)))
max_ms = ns_to_ms(max(timings))
normalization_factor = params.loop_count / params.base_loop_count
return {
"name": impl.name,
"language": impl.language,
"status": "ok",
"timing_mode": params.mode,
"loop_count": params.loop_count,
"base_loop_count": params.base_loop_count,
"normalization_factor": normalization_factor,
"repeats": args.repeats,
"warmups": args.warmups,
"checksum": params.expected_checksum,
"min_ms": min_ms,
"median_ms": median_ms,
"max_ms": max_ms,
"normalized_min_ms": min_ms / normalization_factor,
"normalized_median_ms": median_ms / normalization_factor,
"normalized_max_ms": max_ms / normalization_factor,
"timing_scope": TIMING_SCOPE,
}
def skipped_result(impl: Implementation, reasons: list[str]) -> dict[str, object]:
return {"name": impl.name, "language": impl.language, "status": "skipped", "reason": "; ".join(reasons)}
def failed_result(impl: Implementation, message: str, process: subprocess.CompletedProcess[str]) -> dict[str, object]:
return {
"name": impl.name,
"language": impl.language,
"status": "failed",
"reason": message,
"returncode": process.returncode,
"stdout": process.stdout,
"stderr": process.stderr,
}
def emit_results(spec: BenchmarkSpec, results: list[dict[str, object]], as_json: bool) -> None:
if as_json:
print(
json.dumps(
{
"benchmark": spec.name,
"base_loop_count": spec.loop_count,
"timing_scope": TIMING_SCOPE,
"results": results,
},
indent=2,
sort_keys=True,
)
)
return
mode = next((str(result["timing_mode"]) for result in results if result["status"] == "ok"), "unknown")
if mode == "hot-loop":
loop_count = next((int(result["loop_count"]) for result in results if result["status"] == "ok"), spec.hot_loop_count)
print(
f"{spec.name}: {TIMING_SCOPE} "
f"(mode=hot-loop, loop_count={loop_count}, normalized_to={spec.loop_count})"
)
else:
print(f"{spec.name}: {TIMING_SCOPE}")
for result in results:
status = result["status"]
name = result["name"]
if status == "ok":
if result["timing_mode"] == "hot-loop":
print(
"{name}: total_min={min_ms:.3f}ms total_median={median_ms:.3f}ms "
"total_max={max_ms:.3f}ms normalized_median={normalized_median_ms:.3f}ms".format(
name=name,
min_ms=result["min_ms"],
median_ms=result["median_ms"],
max_ms=result["max_ms"],
normalized_median_ms=result["normalized_median_ms"],
)
)
else:
print(
"{name}: min={min_ms:.3f}ms median={median_ms:.3f}ms max={max_ms:.3f}ms".format(
name=name,
min_ms=result["min_ms"],
median_ms=result["median_ms"],
max_ms=result["max_ms"],
)
)
else:
print(f"{name}: {status} ({result['reason']})")
def run_command(command: list[str], stdin_text: str | None = None) -> subprocess.CompletedProcess[str]:
return subprocess.run(
command,
input=stdin_text,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
def normalized_stdout(stdout: str) -> str:
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
if not lines:
return ""
return lines[-1]
def run_parameters(spec: BenchmarkSpec, mode: str) -> RunParameters:
if mode == "hot-loop":
return RunParameters(
mode=mode,
loop_count=spec.hot_loop_count,
expected_checksum=spec.hot_expected_checksum,
stdin_text=spec.hot_stdin_text,
base_loop_count=spec.loop_count,
)
return RunParameters(
mode="cold-process",
loop_count=spec.loop_count,
expected_checksum=spec.expected_checksum,
stdin_text=spec.stdin_text,
base_loop_count=spec.loop_count,
)
def run_succeeded_for_params(process: subprocess.CompletedProcess[str], params: RunParameters) -> bool:
if normalized_stdout(process.stdout) != params.expected_checksum:
return False
if params.mode == "hot-loop":
return True
return process.returncode == 0
def ns_to_ms(value: int) -> float:
return value / 1_000_000.0
def first_available(candidates: list[str]) -> str | None:
for candidate in candidates:
found = shutil.which(candidate)
if found:
return found
return None
def format_command(command: list[str]) -> str:
return " ".join(shlex_quote(part) for part in command)
def shlex_quote(value: str) -> str:
if value and all(char.isalnum() or char in "/._:-" for char in value):
return value
return "'" + value.replace("'", "'\"'\"'") + "'"
if __name__ == "__main__":
raise SystemExit(main(Path(__file__).resolve().parent, sys.argv[1:]))