523 lines
18 KiB
Python
523 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Shared local benchmark runner for Glagol benchmark scaffolds."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import statistics
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Callable
|
|
|
|
|
|
TIMING_SCOPE = "local-machine comparison only"
|
|
TIMING_MODES = ["cold-process", "hot-loop"]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BenchmarkSpec:
|
|
name: str
|
|
source_stem: str
|
|
loop_count: int
|
|
expected_checksum: str
|
|
stdin_text: str
|
|
hot_loop_count: int
|
|
hot_expected_checksum: str
|
|
hot_stdin_text: str
|
|
run_args: list[str]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RunParameters:
|
|
mode: str
|
|
loop_count: int
|
|
expected_checksum: str
|
|
stdin_text: str
|
|
base_loop_count: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Implementation:
|
|
name: str
|
|
language: str
|
|
source: Path
|
|
build: Callable[[Path, BenchmarkSpec, argparse.Namespace], tuple[list[str], list[str] | None]]
|
|
run: Callable[[Path, BenchmarkSpec, argparse.Namespace], list[str]]
|
|
|
|
|
|
def main(root: Path, argv: list[str]) -> int:
|
|
spec = read_spec(root)
|
|
implementations = available_implementations(root, spec)
|
|
parser = argparse.ArgumentParser(description=f"Run local {spec.name} timing comparisons.")
|
|
parser.add_argument("--list", action="store_true", help="print benchmark metadata and exit")
|
|
parser.add_argument("--json", action="store_true", help="emit JSON for --list or results")
|
|
parser.add_argument("--dry-run", action="store_true", help="print planned commands without running them")
|
|
parser.add_argument(
|
|
"--mode",
|
|
choices=TIMING_MODES,
|
|
default="cold-process",
|
|
help="cold-process measures one normal run; hot-loop uses an amplified loop count and normalized results",
|
|
)
|
|
parser.add_argument("--only", choices=[impl.name for impl in implementations], action="append")
|
|
parser.add_argument("--repeats", type=positive_int, default=5)
|
|
parser.add_argument("--warmups", type=non_negative_int, default=1)
|
|
parser.add_argument("--glagol", help="path to the glagol compiler binary")
|
|
parser.add_argument("--cc", help="path to the C compiler")
|
|
parser.add_argument("--clojure", help="path to the clojure command")
|
|
parser.add_argument("--clojure-jar", help="path to a clojure jar for `java -cp ... clojure.main`")
|
|
parser.add_argument("--sbcl", help="path to the SBCL executable for Common Lisp comparisons")
|
|
args = parser.parse_args(argv)
|
|
|
|
selected = select_implementations(implementations, args.only)
|
|
|
|
if args.list:
|
|
emit_list(root, spec, selected, args.json)
|
|
return 0
|
|
|
|
if args.dry_run:
|
|
emit_dry_run(root, spec, selected, args)
|
|
return 0
|
|
|
|
results = run_benchmarks(root, spec, selected, args)
|
|
emit_results(spec, results, args.json)
|
|
return 1 if any(result["status"] == "failed" for result in results) else 0
|
|
|
|
|
|
def read_spec(root: Path) -> BenchmarkSpec:
|
|
data = json.loads((root / "benchmark.json").read_text(encoding="utf-8"))
|
|
loop_count = int(data["loop_count"])
|
|
return BenchmarkSpec(
|
|
name=str(data["benchmark"]),
|
|
source_stem=str(data["source_stem"]),
|
|
loop_count=loop_count,
|
|
expected_checksum=str(data["expected_checksum"]),
|
|
stdin_text=str(data.get("stdin", f"{loop_count}\n")),
|
|
hot_loop_count=int(data.get("hot_loop_count", loop_count)),
|
|
hot_expected_checksum=str(data.get("hot_expected_checksum", data["expected_checksum"])),
|
|
hot_stdin_text=str(data.get("hot_stdin", f"{int(data.get('hot_loop_count', loop_count))}\n")),
|
|
run_args=[str(item) for item in data.get("run_args", [])],
|
|
)
|
|
|
|
|
|
def available_implementations(root: Path, spec: BenchmarkSpec) -> list[Implementation]:
|
|
candidates = [
|
|
Implementation("slovo", "Slovo", root / "src" / "main.slo", slovo_build, slovo_run),
|
|
Implementation("c", "C", root / "c" / f"{spec.source_stem}.c", c_build, c_run),
|
|
Implementation("rust", "Rust", root / "rust" / f"{spec.source_stem}.rs", rust_build, rust_run),
|
|
Implementation("python", "Python", root / "python" / f"{spec.source_stem}.py", python_build, python_run),
|
|
Implementation("clojure", "Clojure", root / "clojure" / f"{spec.source_stem}.clj", clojure_build, clojure_run),
|
|
Implementation(
|
|
"common_lisp",
|
|
"Common Lisp (SBCL)",
|
|
root / "common-lisp" / f"{spec.source_stem}.lisp",
|
|
common_lisp_build,
|
|
common_lisp_run,
|
|
),
|
|
]
|
|
return [impl for impl in candidates if impl.source.is_file()]
|
|
|
|
|
|
def slovo_compiler(root: Path, args: argparse.Namespace) -> str | None:
|
|
if args.glagol:
|
|
return args.glagol
|
|
env_path = os.environ.get("GLAGOL")
|
|
if env_path:
|
|
return env_path
|
|
candidate = root.parents[1] / "compiler" / "target" / "debug" / executable("glagol")
|
|
if candidate.is_file():
|
|
return str(candidate)
|
|
return shutil.which("glagol")
|
|
|
|
|
|
def executable(name: str) -> str:
|
|
return f"{name}.exe" if os.name == "nt" else name
|
|
|
|
|
|
def slovo_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
compiler = slovo_compiler(root, args)
|
|
if compiler is None:
|
|
return [], ["missing glagol compiler; set GLAGOL or pass --glagol"]
|
|
if os.environ.get("GLAGOL_CLANG") is None and shutil.which("clang") is None:
|
|
return [], ["missing clang for glagol build; set GLAGOL_CLANG"]
|
|
output = build_dir(root) / executable(f"slovo-{spec.name}")
|
|
return [compiler, "build", str(root), "-o", str(output)], None
|
|
|
|
|
|
def slovo_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
|
|
params = run_parameters(spec, _args.mode)
|
|
return [str(build_dir(root) / executable(f"slovo-{spec.name}")), *spec.run_args, str(params.loop_count)]
|
|
|
|
|
|
def c_build(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
compiler = args.cc or os.environ.get("CC") or first_available(["clang", "cc", "gcc"])
|
|
if compiler is None:
|
|
return [], ["missing C compiler; set CC or pass --cc"]
|
|
output = build_dir(root) / executable(f"c-{spec.name}")
|
|
return [compiler, "-O2", "-std=c11", str(root / "c" / f"{spec.source_stem}.c"), "-o", str(output)], None
|
|
|
|
|
|
def c_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
|
|
return [str(build_dir(root) / executable(f"c-{spec.name}")), *spec.run_args]
|
|
|
|
|
|
def rust_build(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
rustc = first_available(["rustc"])
|
|
if rustc is None:
|
|
return [], ["missing rustc"]
|
|
output = build_dir(root) / executable(f"rust-{spec.name}")
|
|
return [
|
|
rustc,
|
|
"-C",
|
|
"opt-level=3",
|
|
"-C",
|
|
"debuginfo=0",
|
|
str(root / "rust" / f"{spec.source_stem}.rs"),
|
|
"-o",
|
|
str(output),
|
|
], None
|
|
|
|
|
|
def rust_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
|
|
return [str(build_dir(root) / executable(f"rust-{spec.name}")), *spec.run_args]
|
|
|
|
|
|
def python_build(_root: Path, _spec: BenchmarkSpec, _args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
return [], None
|
|
|
|
|
|
def python_run(root: Path, spec: BenchmarkSpec, _args: argparse.Namespace) -> list[str]:
|
|
return [sys.executable, str(root / "python" / f"{spec.source_stem}.py"), *spec.run_args]
|
|
|
|
|
|
def clojure_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
if clojure_command(args) is None:
|
|
return [], ["missing clojure command; set CLOJURE, pass --clojure, or set CLOJURE_JAR"]
|
|
return [], None
|
|
|
|
|
|
def clojure_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]:
|
|
source = str(root / "clojure" / f"{spec.source_stem}.clj")
|
|
command = clojure_command(args)
|
|
assert command is not None
|
|
return [*command, source, *spec.run_args]
|
|
|
|
|
|
def clojure_command(args: argparse.Namespace) -> list[str] | None:
|
|
if args.clojure:
|
|
return [args.clojure]
|
|
env_path = os.environ.get("CLOJURE")
|
|
if env_path:
|
|
return [env_path]
|
|
found = shutil.which("clojure")
|
|
if found:
|
|
return [found]
|
|
|
|
jar = args.clojure_jar or os.environ.get("CLOJURE_JAR")
|
|
java = shutil.which("java")
|
|
if jar and java:
|
|
return [java, "-cp", jar, "clojure.main"]
|
|
return None
|
|
|
|
|
|
def common_lisp_build(_root: Path, _spec: BenchmarkSpec, args: argparse.Namespace) -> tuple[list[str], list[str] | None]:
|
|
if sbcl_command(args) is None:
|
|
return [], ["missing SBCL; set SBCL or pass --sbcl"]
|
|
return [], None
|
|
|
|
|
|
def common_lisp_run(root: Path, spec: BenchmarkSpec, args: argparse.Namespace) -> list[str]:
|
|
sbcl = sbcl_command(args)
|
|
assert sbcl is not None
|
|
return [
|
|
sbcl,
|
|
"--noinform",
|
|
"--disable-debugger",
|
|
"--script",
|
|
str(root / "common-lisp" / f"{spec.source_stem}.lisp"),
|
|
*spec.run_args,
|
|
]
|
|
|
|
|
|
def sbcl_command(args: argparse.Namespace) -> str | None:
|
|
if args.sbcl:
|
|
return args.sbcl
|
|
env_path = os.environ.get("SBCL")
|
|
if env_path:
|
|
return env_path
|
|
return shutil.which("sbcl")
|
|
|
|
|
|
def build_dir(root: Path) -> Path:
|
|
return root / "build"
|
|
|
|
|
|
def positive_int(value: str) -> int:
|
|
parsed = int(value)
|
|
if parsed <= 0:
|
|
raise argparse.ArgumentTypeError("value must be greater than zero")
|
|
return parsed
|
|
|
|
|
|
def non_negative_int(value: str) -> int:
|
|
parsed = int(value)
|
|
if parsed < 0:
|
|
raise argparse.ArgumentTypeError("value must be zero or greater")
|
|
return parsed
|
|
|
|
|
|
def select_implementations(implementations: list[Implementation], names: list[str] | None) -> list[Implementation]:
|
|
if not names:
|
|
return implementations
|
|
selected_names = set(names)
|
|
return [impl for impl in implementations if impl.name in selected_names]
|
|
|
|
|
|
def emit_list(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], as_json: bool) -> None:
|
|
metadata = {
|
|
"benchmark": spec.name,
|
|
"loop_count": spec.loop_count,
|
|
"hot_loop_count": spec.hot_loop_count,
|
|
"expected_checksum": spec.expected_checksum,
|
|
"hot_expected_checksum": spec.hot_expected_checksum,
|
|
"timing_scope": TIMING_SCOPE,
|
|
"timing_modes": TIMING_MODES,
|
|
"loop_count_source": "stdin",
|
|
"run_args": spec.run_args,
|
|
"implementations": [
|
|
{"name": impl.name, "language": impl.language, "source": str(impl.source.relative_to(root))}
|
|
for impl in implementations
|
|
],
|
|
}
|
|
|
|
if as_json:
|
|
print(json.dumps(metadata, indent=2, sort_keys=True))
|
|
return
|
|
|
|
print(f"{spec.name}: {TIMING_SCOPE}")
|
|
print(f"loop_count={spec.loop_count}")
|
|
print(f"hot_loop_count={spec.hot_loop_count}")
|
|
print("loop_count_source=stdin")
|
|
if spec.run_args:
|
|
print(f"run_args={' '.join(spec.run_args)}")
|
|
print(f"expected_checksum={spec.expected_checksum}")
|
|
print("implementations:")
|
|
for impl in implementations:
|
|
print(f" {impl.name}: {impl.language} ({impl.source.relative_to(root)})")
|
|
|
|
|
|
def emit_dry_run(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> None:
|
|
params = run_parameters(spec, args.mode)
|
|
print(f"{spec.name}: {TIMING_SCOPE}")
|
|
print(f"mode={params.mode}")
|
|
print(f"loop_count={params.loop_count}")
|
|
print(f"expected_checksum={params.expected_checksum}")
|
|
for impl in implementations:
|
|
build_command, skip_reasons = impl.build(root, spec, args)
|
|
print(f"{impl.name}:")
|
|
if skip_reasons:
|
|
print(f" skip: {'; '.join(skip_reasons)}")
|
|
continue
|
|
if build_command:
|
|
print(f" build: {format_command(build_command)}")
|
|
else:
|
|
print(" build: none")
|
|
print(f" stdin: {params.stdin_text.rstrip()}")
|
|
print(f" run: {format_command(impl.run(root, spec, args))}")
|
|
|
|
|
|
def run_benchmarks(root: Path, spec: BenchmarkSpec, implementations: list[Implementation], args: argparse.Namespace) -> list[dict[str, object]]:
|
|
build_dir(root).mkdir(exist_ok=True)
|
|
return [run_one(root, spec, impl, args) for impl in implementations]
|
|
|
|
|
|
def run_one(root: Path, spec: BenchmarkSpec, impl: Implementation, args: argparse.Namespace) -> dict[str, object]:
|
|
params = run_parameters(spec, args.mode)
|
|
build_command, skip_reasons = impl.build(root, spec, args)
|
|
if skip_reasons:
|
|
return skipped_result(impl, skip_reasons)
|
|
|
|
if build_command:
|
|
build = run_command(build_command)
|
|
if build.returncode != 0:
|
|
return failed_result(impl, "build failed", build)
|
|
|
|
run_command_line = impl.run(root, spec, args)
|
|
for _ in range(args.warmups):
|
|
warmup = run_command(run_command_line, params.stdin_text)
|
|
if not run_succeeded_for_params(warmup, params):
|
|
return failed_result(impl, "warmup failed", warmup)
|
|
|
|
timings: list[int] = []
|
|
for _ in range(args.repeats):
|
|
start = time.perf_counter_ns()
|
|
run = run_command(run_command_line, params.stdin_text)
|
|
elapsed = time.perf_counter_ns() - start
|
|
if not run_succeeded_for_params(run, params):
|
|
return failed_result(impl, "run failed", run)
|
|
timings.append(elapsed)
|
|
|
|
min_ms = ns_to_ms(min(timings))
|
|
median_ms = ns_to_ms(int(statistics.median(timings)))
|
|
max_ms = ns_to_ms(max(timings))
|
|
normalization_factor = params.loop_count / params.base_loop_count
|
|
return {
|
|
"name": impl.name,
|
|
"language": impl.language,
|
|
"status": "ok",
|
|
"timing_mode": params.mode,
|
|
"loop_count": params.loop_count,
|
|
"base_loop_count": params.base_loop_count,
|
|
"normalization_factor": normalization_factor,
|
|
"repeats": args.repeats,
|
|
"warmups": args.warmups,
|
|
"checksum": params.expected_checksum,
|
|
"min_ms": min_ms,
|
|
"median_ms": median_ms,
|
|
"max_ms": max_ms,
|
|
"normalized_min_ms": min_ms / normalization_factor,
|
|
"normalized_median_ms": median_ms / normalization_factor,
|
|
"normalized_max_ms": max_ms / normalization_factor,
|
|
"timing_scope": TIMING_SCOPE,
|
|
}
|
|
|
|
|
|
def skipped_result(impl: Implementation, reasons: list[str]) -> dict[str, object]:
|
|
return {"name": impl.name, "language": impl.language, "status": "skipped", "reason": "; ".join(reasons)}
|
|
|
|
|
|
def failed_result(impl: Implementation, message: str, process: subprocess.CompletedProcess[str]) -> dict[str, object]:
|
|
return {
|
|
"name": impl.name,
|
|
"language": impl.language,
|
|
"status": "failed",
|
|
"reason": message,
|
|
"returncode": process.returncode,
|
|
"stdout": process.stdout,
|
|
"stderr": process.stderr,
|
|
}
|
|
|
|
|
|
def emit_results(spec: BenchmarkSpec, results: list[dict[str, object]], as_json: bool) -> None:
|
|
if as_json:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"benchmark": spec.name,
|
|
"base_loop_count": spec.loop_count,
|
|
"timing_scope": TIMING_SCOPE,
|
|
"results": results,
|
|
},
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
)
|
|
return
|
|
|
|
mode = next((str(result["timing_mode"]) for result in results if result["status"] == "ok"), "unknown")
|
|
if mode == "hot-loop":
|
|
loop_count = next((int(result["loop_count"]) for result in results if result["status"] == "ok"), spec.hot_loop_count)
|
|
print(
|
|
f"{spec.name}: {TIMING_SCOPE} "
|
|
f"(mode=hot-loop, loop_count={loop_count}, normalized_to={spec.loop_count})"
|
|
)
|
|
else:
|
|
print(f"{spec.name}: {TIMING_SCOPE}")
|
|
for result in results:
|
|
status = result["status"]
|
|
name = result["name"]
|
|
if status == "ok":
|
|
if result["timing_mode"] == "hot-loop":
|
|
print(
|
|
"{name}: total_min={min_ms:.3f}ms total_median={median_ms:.3f}ms "
|
|
"total_max={max_ms:.3f}ms normalized_median={normalized_median_ms:.3f}ms".format(
|
|
name=name,
|
|
min_ms=result["min_ms"],
|
|
median_ms=result["median_ms"],
|
|
max_ms=result["max_ms"],
|
|
normalized_median_ms=result["normalized_median_ms"],
|
|
)
|
|
)
|
|
else:
|
|
print(
|
|
"{name}: min={min_ms:.3f}ms median={median_ms:.3f}ms max={max_ms:.3f}ms".format(
|
|
name=name,
|
|
min_ms=result["min_ms"],
|
|
median_ms=result["median_ms"],
|
|
max_ms=result["max_ms"],
|
|
)
|
|
)
|
|
else:
|
|
print(f"{name}: {status} ({result['reason']})")
|
|
|
|
|
|
def run_command(command: list[str], stdin_text: str | None = None) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
command,
|
|
input=stdin_text,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
check=False,
|
|
)
|
|
|
|
|
|
def normalized_stdout(stdout: str) -> str:
|
|
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
|
|
if not lines:
|
|
return ""
|
|
return lines[-1]
|
|
|
|
|
|
def run_parameters(spec: BenchmarkSpec, mode: str) -> RunParameters:
|
|
if mode == "hot-loop":
|
|
return RunParameters(
|
|
mode=mode,
|
|
loop_count=spec.hot_loop_count,
|
|
expected_checksum=spec.hot_expected_checksum,
|
|
stdin_text=spec.hot_stdin_text,
|
|
base_loop_count=spec.loop_count,
|
|
)
|
|
return RunParameters(
|
|
mode="cold-process",
|
|
loop_count=spec.loop_count,
|
|
expected_checksum=spec.expected_checksum,
|
|
stdin_text=spec.stdin_text,
|
|
base_loop_count=spec.loop_count,
|
|
)
|
|
|
|
|
|
def run_succeeded_for_params(process: subprocess.CompletedProcess[str], params: RunParameters) -> bool:
|
|
if normalized_stdout(process.stdout) != params.expected_checksum:
|
|
return False
|
|
if params.mode == "hot-loop":
|
|
return True
|
|
return process.returncode == 0
|
|
|
|
|
|
def ns_to_ms(value: int) -> float:
|
|
return value / 1_000_000.0
|
|
|
|
|
|
def first_available(candidates: list[str]) -> str | None:
|
|
for candidate in candidates:
|
|
found = shutil.which(candidate)
|
|
if found:
|
|
return found
|
|
return None
|
|
|
|
|
|
def format_command(command: list[str]) -> str:
|
|
return " ".join(shlex_quote(part) for part in command)
|
|
|
|
|
|
def shlex_quote(value: str) -> str:
|
|
if value and all(char.isalnum() or char in "/._:-" for char in value):
|
|
return value
|
|
return "'" + value.replace("'", "'\"'\"'") + "'"
|