#!/usr/bin/env python3 """Shared local benchmark runner for Glagol benchmark scaffolds.""" from __future__ import annotations import argparse import json import os import shutil import statistics import subprocess import sys import time from dataclasses import dataclass from pathlib import Path from typing import Callable TIMING_SCOPE = "local-machine comparison only" TIMING_MODES = ["cold-process", "hot-loop"] SUITE_NAME = "glagol-local-benchmark-suite" LOCAL_TIMING_DISCLAIMER = ( "Local timing comparison only; not a published benchmark result and not a cross-machine performance claim." ) REQUIRED_BENCHMARK_FILES = ["benchmark.json", "run.py", "slovo.toml", "src/main.slo"] EXPECTED_IMPLEMENTATION_NAMES = ["slovo", "c", "rust", "python", "clojure", "common_lisp"] @dataclass(frozen=True) class BenchmarkSpec: name: str source_stem: str loop_count: int expected_checksum: str stdin_text: str hot_loop_count: int hot_expected_checksum: str hot_stdin_text: str run_args: list[str] @dataclass(frozen=True) class RunParameters: mode: str loop_count: int expected_checksum: str stdin_text: str base_loop_count: int @dataclass(frozen=True) class Implementation: name: str language: str source: Path build: Callable[[Path, BenchmarkSpec, argparse.Namespace], tuple[list[str], list[str] | None]] run: Callable[[Path, BenchmarkSpec, argparse.Namespace], list[str]] def main(root: Path, argv: list[str]) -> int: if any(arg == "--suite-list" for arg in argv): return main_suite(root, argv) if not (root / "benchmark.json").is_file(): parser = argparse.ArgumentParser(description="Shared local Glagol benchmark runner.") parser.add_argument("--suite-list", action="store_true", help="list suite metadata and exit") parser.add_argument("--json", action="store_true", help="emit JSON for --suite-list") parser.error("run from a benchmark run.py, or pass --suite-list at the benchmark suite root") spec = read_spec(root) implementations = available_implementations(root, spec) parser = argparse.ArgumentParser(description=f"Run local {spec.name} timing comparisons.") parser.add_argument("--list", action="store_true", help="print benchmark metadata and exit") parser.add_argument("--json", action="store_true", help="emit JSON for --list or results") parser.add_argument("--dry-run", action="store_true", help="print planned commands without running them") parser.add_argument( "--mode", choices=TIMING_MODES, default="cold-process", help="cold-process measures one normal run; hot-loop uses an amplified loop count and normalized results", ) parser.add_argument("--only", choices=[impl.name for impl in implementations], action="append") parser.add_argument("--repeats", type=positive_int, default=5) parser.add_argument("--warmups", type=non_negative_int, default=1) parser.add_argument("--glagol", help="path to the glagol compiler binary") parser.add_argument("--cc", help="path to the C compiler") parser.add_argument("--clojure", help="path to the clojure command") parser.add_argument("--clojure-jar", help="path to a clojure jar for `java -cp ... clojure.main`") parser.add_argument("--sbcl", help="path to the SBCL executable for Common Lisp comparisons") args = parser.parse_args(argv) selected = select_implementations(implementations, args.only) if args.list: emit_list(root, spec, selected, args.json) return 0 if args.dry_run: emit_dry_run(root, spec, selected, args) return 0 results = run_benchmarks(root, spec, selected, args) emit_results(spec, results, args.json) return 1 if any(result["status"] == "failed" for result in results) else 0 def main_suite(root: Path, argv: list[str]) -> int: parser = argparse.ArgumentParser(description="List local Glagol benchmark suite metadata.") parser.add_argument("--suite-list", action="store_true", help="list suite metadata and exit") parser.add_argument("--json", action="store_true", help="emit JSON for suite metadata") args = parser.parse_args(argv) if not args.suite_list: parser.error("pass --suite-list to list benchmark suite metadata") suite_root = resolve_suite_root(root) emit_suite_list(build_suite_catalog(suite_root), args.json) return 0 def resolve_suite_root(root: Path) -> Path: if (root / "benchmark.json").is_file(): return root.parent return root def read_spec(root: Path) -> BenchmarkSpec: data = json.loads((root / "benchmark.json").read_text(encoding="utf-8")) loop_count = int(data["loop_count"]) return BenchmarkSpec( name=str(data["benchmark"]), source_stem=str(data["source_stem"]), loop_count=loop_count, expected_checksum=str(data["expected_checksum"]), stdin_text=str(data.get("stdin", f"{loop_count}\n")), hot_loop_count=int(data.get("hot_loop_count", loop_count)), hot_expected_checksum=str(data.get("hot_expected_checksum", data["expected_checksum"])), hot_stdin_text=str(data.get("hot_stdin", f"{int(data.get('hot_loop_count', loop_count))}\n")), run_args=[str(item) for item in data.get("run_args", [])], ) def available_implementations(root: Path, spec: BenchmarkSpec) -> list[Implementation]: candidates = [ Implementation("slovo", "Slovo", root / "src" / "main.slo", slovo_build, slovo_run), Implementation("c", "C", root / "c" / f"{spec.source_stem}.c", c_build, c_run), Implementation("rust", "Rust", root / "rust" / f"{spec.source_stem}.rs", rust_build, rust_run), Implementation("python", "Python", root / "python" / f"{spec.source_stem}.py", python_build, python_run), Implementation("clojure", "Clojure", root / "clojure" / f"{spec.source_stem}.clj", clojure_build, clojure_run), Implementation( "common_lisp", "Common Lisp (SBCL)", root / "common-lisp" / f"{spec.source_stem}.lisp", common_lisp_build, common_lisp_run, ), ] return [impl for impl in candidates if impl.source.is_file()] def slovo_compiler(root: Path, args: argparse.Namespace) -> str | None: if args.glagol: return args.glagol env_path = os.environ.get("GLAGOL") if env_path: return env_path candidate = root.parents[1] / "compiler" / "target" / "debug" / executable("glagol") if candidate.is_file(): return str(candidate) return shutil.which("glagol") def executable(name: str) -> str: return f"{name}.exe" if os.name == "nt" else name def slovo_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]: compiler = slovo_compiler(root, args) if compiler is None: return [], ["missing glagol compiler; set GLAGOL or pass --glagol"] if os.environ.get("GLAGOL_CLANG") is None and shutil.which("clang") is None: return [], ["missing clang for glagol build; set GLAGOL_CLANG"] output = build_dir(root) / executable(f"slovo-{spec.name}") return [compiler, "build", str(root), "-o", str(output)], None def slovo_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]: params = run_parameters(spec, _args.mode) return [str(build_dir(root) / executable(f"slovo-{spec.name}")), *spec.run_args, str(params.loop_count)] def c_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]: compiler = args.cc or os.environ.get("CC") or first_available(["clang", "cc", "gcc"]) if compiler is None: return [], ["missing C compiler; set CC or pass --cc"] output = build_dir(root) / executable(f"c-{spec.name}") return [compiler, "-O2", "-std=c11", str(root / "c" / f"{spec.source_stem}.c"), "-o", str(output)], None def c_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]: return [str(build_dir(root) / executable(f"c-{spec.name}")), *spec.run_args] def rust_build(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]: rustc = first_available(["rustc"]) if rustc is None: return [], ["missing rustc"] output = build_dir(root) / executable(f"rust-{spec.name}") return [ rustc, "-C", "opt-level=3", "-C", "debuginfo=0", str(root / "rust" / f"{spec.source_stem}.rs"), "-o", str(output), ], None def rust_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]: return [str(build_dir(root) / executable(f"rust-{spec.name}")), *spec.run_args] def python_build(_root: Path, _spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]: return [], None def python_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]: return [sys.executable, str(root / "python" / f"{spec.source_stem}.py"), *spec.run_args] def clojure_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]: if clojure_command(args) is None: return [], ["missing clojure command; set CLOJURE, pass --clojure, or set CLOJURE_JAR"] return [], None def clojure_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]: source = str(root / "clojure" / f"{spec.source_stem}.clj") command = clojure_command(args) assert command is not None return [*command, source, *spec.run_args] def clojure_command(args: argparse.Namespace) -> list[str] | None: if args.clojure: return [args.clojure] env_path = os.environ.get("CLOJURE") if env_path: return [env_path] found = shutil.which("clojure") if found: return [found] jar = args.clojure_jar or os.environ.get("CLOJURE_JAR") java = shutil.which("java") if jar and java: return [java, "-cp", jar, "clojure.main"] return None def common_lisp_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]: if sbcl_command(args) is None: return [], ["missing SBCL; set SBCL or pass --sbcl"] return [], None def common_lisp_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]: sbcl = sbcl_command(args) assert sbcl is not None return [ sbcl, "--noinform", "--disable-debugger", "--script", str(root / "common-lisp" / f"{spec.source_stem}.lisp"), *spec.run_args, ] def sbcl_command(args: argparse.Namespace) -> str | None: if args.sbcl: return args.sbcl env_path = os.environ.get("SBCL") if env_path: return env_path return shutil.which("sbcl") def build_dir(root: Path) -> Path: return root / "build" def positive_int(value: str) -> int: parsed = int(value) if parsed <= 0: raise argparse.ArgumentTypeError("value must be greater than zero") return parsed def non_negative_int(value: str) -> int: parsed = int(value) if parsed < 0: raise argparse.ArgumentTypeError("value must be zero or greater") return parsed def select_implementations(implementations: list[Implementation], names: list[str] | None) -> list[Implementation]: if not names: return implementations selected_names = set(names) return [impl for impl in implementations if impl.name in selected_names] def build_suite_catalog(suite_root: Path) -> dict[str, object]: benchmarks: list[dict[str, object]] = [] implementation_slot_count = 0 missing_required_files: list[str] = [] missing_implementation_slots: list[str] = [] for root in suite_benchmark_roots(suite_root): spec = read_spec(root) implementations = available_implementations(root, spec) implementation_slot_count += len(implementations) benchmark = suite_benchmark_metadata(suite_root, root, spec, implementations) benchmarks.append(benchmark) directory = str(benchmark["directory"]) for required_file in benchmark["required_files"]: assert isinstance(required_file, dict) if required_file["status"] != "present": missing_required_files.append(f"{directory}/{required_file['path']}") present_implementations = { str(implementation["name"]) for implementation in benchmark["implementation_slots"] if isinstance(implementation, dict) } for expected in EXPECTED_IMPLEMENTATION_NAMES: if expected not in present_implementations: missing_implementation_slots.append(f"{directory}:{expected}") return { "suite": SUITE_NAME, "timing_scope": TIMING_SCOPE, "timing_modes": TIMING_MODES, "timing_disclaimer": LOCAL_TIMING_DISCLAIMER, "benchmark_count": len(benchmarks), "benchmarks": benchmarks, "verification": { "status": "ok" if not missing_required_files and not missing_implementation_slots else "incomplete", "benchmark_metadata_files": len(benchmarks), "required_files": len(benchmarks) * len(REQUIRED_BENCHMARK_FILES), "missing_required_files": missing_required_files, "implementation_slots": implementation_slot_count, "expected_implementation_slots": len(benchmarks) * len(EXPECTED_IMPLEMENTATION_NAMES), "missing_implementation_slots": missing_implementation_slots, }, } def suite_benchmark_roots(suite_root: Path) -> list[Path]: return sorted( [path for path in suite_root.iterdir() if path.is_dir() and (path / "benchmark.json").is_file()], key=lambda path: path.name, ) def suite_benchmark_metadata( suite_root: Path, root: Path, spec: BenchmarkSpec, implementations: list[Implementation], ) -> dict[str, object]: return { "name": spec.name, "directory": str(root.relative_to(suite_root)), "source_stem": spec.source_stem, "timing_modes": TIMING_MODES, "loop_count_source": "stdin", "loop_count": spec.loop_count, "hot_loop_count": spec.hot_loop_count, "expected_checksum": spec.expected_checksum, "hot_expected_checksum": spec.hot_expected_checksum, "required_files": [ { "path": relative, "status": "present" if (root / relative).is_file() else "missing", } for relative in REQUIRED_BENCHMARK_FILES ], "checksum_metadata": { "cold_process": { "expected_checksum": spec.expected_checksum, "stdin": spec.stdin_text, }, "hot_loop": { "expected_checksum": spec.hot_expected_checksum, "stdin": spec.hot_stdin_text, }, }, "run_args": spec.run_args, "implementation_slots": [ { "name": impl.name, "language": impl.language, "source": str(impl.source.relative_to(suite_root)), } for impl in implementations ], } def emit_suite_list(metadata: dict[str, object], as_json: bool) -> None: if as_json: print(json.dumps(metadata, indent=2, sort_keys=True)) return print(f"{metadata['suite']}: {metadata['timing_scope']}") print(str(metadata["timing_disclaimer"])) print(f"benchmark_count={metadata['benchmark_count']}") print(f"timing_modes={','.join(TIMING_MODES)}") verification = metadata["verification"] assert isinstance(verification, dict) print(f"verification_status={verification['status']}") print(f"required_files={verification['required_files']}") print(f"implementation_slots={verification['implementation_slots']}") print("benchmarks:") for benchmark in metadata["benchmarks"]: assert isinstance(benchmark, dict) print( " {name} ({directory}): loop_count={loop_count} hot_loop_count={hot_loop_count} " "expected_checksum={expected_checksum} hot_expected_checksum={hot_expected_checksum}".format( name=benchmark["name"], directory=benchmark["directory"], loop_count=benchmark["loop_count"], hot_loop_count=benchmark["hot_loop_count"], expected_checksum=benchmark["expected_checksum"], hot_expected_checksum=benchmark["hot_expected_checksum"], ) ) print(" required_files:") for required_file in benchmark["required_files"]: assert isinstance(required_file, dict) print(f" {required_file['path']}: {required_file['status']}") print(" implementations:") for implementation in benchmark["implementation_slots"]: assert isinstance(implementation, dict) print(f" {implementation['name']}: {implementation['language']} ({implementation['source']})") def emit_list(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], as_json: bool) -> None: metadata = { "benchmark": spec.name, "loop_count": spec.loop_count, "hot_loop_count": spec.hot_loop_count, "expected_checksum": spec.expected_checksum, "hot_expected_checksum": spec.hot_expected_checksum, "timing_scope": TIMING_SCOPE, "timing_modes": TIMING_MODES, "loop_count_source": "stdin", "run_args": spec.run_args, "implementations": [ {"name": impl.name, "language": impl.language, "source": str(impl.source.relative_to(root))} for impl in implementations ], } if as_json: print(json.dumps(metadata, indent=2, sort_keys=True)) return print(f"{spec.name}: {TIMING_SCOPE}") print(f"loop_count={spec.loop_count}") print(f"hot_loop_count={spec.hot_loop_count}") print("loop_count_source=stdin") if spec.run_args: print(f"run_args={' '.join(spec.run_args)}") print(f"expected_checksum={spec.expected_checksum}") print("implementations:") for impl in implementations: print(f" {impl.name}: {impl.language} ({impl.source.relative_to(root)})") def emit_dry_run(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> None: params = run_parameters(spec, args.mode) print(f"{spec.name}: {TIMING_SCOPE}") print(f"mode={params.mode}") print(f"loop_count={params.loop_count}") print(f"expected_checksum={params.expected_checksum}") for impl in implementations: build_command, skip_reasons = impl.build(root, spec, args) print(f"{impl.name}:") if skip_reasons: print(f" skip: {'; '.join(skip_reasons)}") continue if build_command: print(f" build: {format_command(build_command)}") else: print(" build: none") print(f" stdin: {params.stdin_text.rstrip()}") print(f" run: {format_command(impl.run(root, spec, args))}") def run_benchmarks(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> list[dict[str, object]]: build_dir(root).mkdir(exist_ok=True) return [run_one(root, spec, impl, args) for impl in implementations] def run_one(root: Path, spec: BenchmarkSpec, impl: Implementation, args: argparse.Namespace) -> dict[str, object]: params = run_parameters(spec, args.mode) build_command, skip_reasons = impl.build(root, spec, args) if skip_reasons: return skipped_result(impl, skip_reasons) if build_command: build = run_command(build_command) if build.returncode != 0: return failed_result(impl, "build failed", build) run_command_line = impl.run(root, spec, args) for _ in range(args.warmups): warmup = run_command(run_command_line, params.stdin_text) if not run_succeeded_for_params(warmup, params): return failed_result(impl, "warmup failed", warmup) timings: list[int] = [] for _ in range(args.repeats): start = time.perf_counter_ns() run = run_command(run_command_line, params.stdin_text) elapsed = time.perf_counter_ns() - start if not run_succeeded_for_params(run, params): return failed_result(impl, "run failed", run) timings.append(elapsed) min_ms = ns_to_ms(min(timings)) median_ms = ns_to_ms(int(statistics.median(timings))) max_ms = ns_to_ms(max(timings)) normalization_factor = params.loop_count / params.base_loop_count return { "name": impl.name, "language": impl.language, "status": "ok", "timing_mode": params.mode, "loop_count": params.loop_count, "base_loop_count": params.base_loop_count, "normalization_factor": normalization_factor, "repeats": args.repeats, "warmups": args.warmups, "checksum": params.expected_checksum, "min_ms": min_ms, "median_ms": median_ms, "max_ms": max_ms, "normalized_min_ms": min_ms / normalization_factor, "normalized_median_ms": median_ms / normalization_factor, "normalized_max_ms": max_ms / normalization_factor, "timing_scope": TIMING_SCOPE, } def skipped_result(impl: Implementation, reasons: list[str]) -> dict[str, object]: return {"name": impl.name, "language": impl.language, "status": "skipped", "reason": "; ".join(reasons)} def failed_result(impl: Implementation, message: str, process: subprocess.CompletedProcess[str]) -> dict[str, object]: return { "name": impl.name, "language": impl.language, "status": "failed", "reason": message, "returncode": process.returncode, "stdout": process.stdout, "stderr": process.stderr, } def emit_results(spec: BenchmarkSpec, results: list[dict[str, object]], as_json: bool) -> None: if as_json: print( json.dumps( { "benchmark": spec.name, "base_loop_count": spec.loop_count, "timing_scope": TIMING_SCOPE, "results": results, }, indent=2, sort_keys=True, ) ) return mode = next((str(result["timing_mode"]) for result in results if result["status"] == "ok"), "unknown") if mode == "hot-loop": loop_count = next((int(result["loop_count"]) for result in results if result["status"] == "ok"), spec.hot_loop_count) print( f"{spec.name}: {TIMING_SCOPE} " f"(mode=hot-loop, loop_count={loop_count}, normalized_to={spec.loop_count})" ) else: print(f"{spec.name}: {TIMING_SCOPE}") for result in results: status = result["status"] name = result["name"] if status == "ok": if result["timing_mode"] == "hot-loop": print( "{name}: total_min={min_ms:.3f}ms total_median={median_ms:.3f}ms " "total_max={max_ms:.3f}ms normalized_median={normalized_median_ms:.3f}ms".format( name=name, min_ms=result["min_ms"], median_ms=result["median_ms"], max_ms=result["max_ms"], normalized_median_ms=result["normalized_median_ms"], ) ) else: print( "{name}: min={min_ms:.3f}ms median={median_ms:.3f}ms max={max_ms:.3f}ms".format( name=name, min_ms=result["min_ms"], median_ms=result["median_ms"], max_ms=result["max_ms"], ) ) else: print(f"{name}: {status} ({result['reason']})") def run_command(command: list[str], stdin_text: str | None = None) -> subprocess.CompletedProcess[str]: return subprocess.run( command, input=stdin_text, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) def normalized_stdout(stdout: str) -> str: lines = [line.strip() for line in stdout.splitlines() if line.strip()] if not lines: return "" return lines[-1] def run_parameters(spec: BenchmarkSpec, mode: str) -> RunParameters: if mode == "hot-loop": return RunParameters( mode=mode, loop_count=spec.hot_loop_count, expected_checksum=spec.hot_expected_checksum, stdin_text=spec.hot_stdin_text, base_loop_count=spec.loop_count, ) return RunParameters( mode="cold-process", loop_count=spec.loop_count, expected_checksum=spec.expected_checksum, stdin_text=spec.stdin_text, base_loop_count=spec.loop_count, ) def run_succeeded_for_params(process: subprocess.CompletedProcess[str], params: RunParameters) -> bool: if normalized_stdout(process.stdout) != params.expected_checksum: return False if params.mode == "hot-loop": return True return process.returncode == 0 def ns_to_ms(value: int) -> float: return value / 1_000_000.0 def first_available(candidates: list[str]) -> str | None: for candidate in candidates: found = shutil.which(candidate) if found: return found return None def format_command(command: list[str]) -> str: return " ".join(shlex_quote(part) for part in command) def shlex_quote(value: str) -> str: if value and all(char.isalnum() or char in "/._:-" for char in value): return value return "'" + value.replace("'", "'\"'\"'") + "'" if __name__ == "__main__": raise SystemExit(main(Path(__file__).resolve().parent, sys.argv[1:]))