Files
llm-rag-ds-optimizer/scripts/run_benchmarks.py

356 lines
14 KiB
Python

"""Run end-to-end benchmarks on real corpora with variance analysis."""
import argparse
import csv
import json
import random
import sys
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
sys.path.insert(0, str(Path(__file__).parent.parent))
from llmds.data_sources.beir_loader import load_beir
from llmds.data_sources.amazon_reviews import load_amazon_reviews
from llmds.retrieval_pipeline import RetrievalPipeline
from llmds.utils import Timer, memory_profiler, calculate_statistics
def aggregate_repetitions(results: list[dict]) -> dict[str, Any]:
"""
Aggregate results across repetitions with variance analysis.
Args:
results: List of result dictionaries from multiple repetitions
Returns:
Dictionary with aggregated statistics including variance metrics
"""
if not results:
return {}
# Extract metric names (all numeric keys except metadata)
metadata_keys = {"corpus", "size", "ef_search", "M", "num_queries", "repetition"}
metric_keys = [k for k in results[0].keys() if k not in metadata_keys]
aggregated = {
"corpus": results[0].get("corpus"),
"size": results[0].get("size"),
"ef_search": results[0].get("ef_search"),
"M": results[0].get("M"),
"num_queries": results[0].get("num_queries"),
"repetitions": len(results),
}
# Calculate statistics for each metric
for metric in metric_keys:
values = [r.get(metric, 0.0) for r in results if metric in r]
if values:
stats_dict = calculate_statistics(values)
# Store both mean/std and full statistics
aggregated[f"{metric}_mean"] = stats_dict["mean"]
aggregated[f"{metric}_std"] = stats_dict["std"]
aggregated[f"{metric}_min"] = stats_dict["min"]
aggregated[f"{metric}_max"] = stats_dict["max"]
aggregated[f"{metric}_ci_lower"] = stats_dict["ci_lower"]
aggregated[f"{metric}_ci_upper"] = stats_dict["ci_upper"]
aggregated[f"{metric}_cv"] = stats_dict["cv"] # Coefficient of variation
# Identify flaky benchmarks (high variance)
# Mark as flaky if CV > 20% for critical metrics
critical_metrics = ["search_p50_ms", "search_p95_ms", "qps"]
flaky_metrics = []
for metric in critical_metrics:
cv_key = f"{metric}_cv"
if cv_key in aggregated and aggregated[cv_key] > 20.0:
flaky_metrics.append(metric)
aggregated["flaky_metrics"] = flaky_metrics
aggregated["is_flaky"] = len(flaky_metrics) > 0
return aggregated
def load_corpus_sample(corpus_file: Path, size: int, seed: int = 42) -> list[dict]:
"""Load a sample of documents from corpus."""
random.seed(seed)
np.random.seed(seed)
all_docs = []
with open(corpus_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
all_docs.append(json.loads(line))
if len(all_docs) <= size:
return all_docs
# Sample without replacement
return random.sample(all_docs, size)
def run_benchmark(
corpus_file: Path,
emb_file: Path | None,
corpus_name: str,
size: int,
ef_search: int,
M: int,
num_queries: int = 100,
embedding_dim: int = 384,
) -> dict:
"""
Run benchmark on a corpus sample.
Returns:
Dictionary with benchmark results
"""
print(f"\n=== Benchmarking {corpus_name} (size={size}, ef={ef_search}, M={M}) ===")
# Load corpus sample
print(f"Loading corpus sample...")
docs = load_corpus_sample(corpus_file, size)
print(f"Loaded {len(docs)} documents")
# Load or generate embeddings
if emb_file and emb_file.exists():
embeddings = np.load(emb_file)
# Trim to sample size
embeddings = embeddings[:len(docs)]
else:
print("Generating deterministic embeddings...")
rng = np.random.RandomState(42)
embeddings = []
for i in range(len(docs)):
emb = rng.randn(embedding_dim).astype(np.float32)
emb = emb / np.linalg.norm(emb)
embeddings.append(emb)
embeddings = np.stack(embeddings)
# Build pipeline with deterministic seed
print("Building pipeline...")
# Memory profiling for build phase
with memory_profiler() as mem_profiler:
pipeline = RetrievalPipeline(
embedding_dim=embedding_dim,
hnsw_M=M,
hnsw_ef_search=ef_search,
hnsw_ef_construction=ef_search * 4,
seed=42, # Fixed seed for reproducible HNSW structure
)
# Add documents
build_times = []
for i, doc in enumerate(docs):
with Timer() as t:
pipeline.add_document(
doc_id=i,
text=doc["text"],
embedding=embeddings[i],
)
build_times.append(t.elapsed * 1000)
# Sample memory periodically during build
if (i + 1) % (len(docs) // 10 + 1) == 0:
mem_profiler.sample()
build_peak_rss_mb = mem_profiler.get_peak_rss_mb()
build_memory_delta_mb = mem_profiler.get_memory_delta_mb()
# Run queries with memory profiling
print(f"Running {num_queries} queries...")
search_times = []
rng = np.random.RandomState(42)
# Generate query embeddings
query_embeddings = []
for _ in range(num_queries):
qemb = rng.randn(embedding_dim).astype(np.float32)
qemb = qemb / np.linalg.norm(qemb)
query_embeddings.append(qemb)
# Use document texts as queries (simplified)
query_texts = [docs[i % len(docs)]["text"][:100] for i in range(num_queries)]
# Memory profiling for search phase
with memory_profiler() as search_mem_profiler:
for i, (query_text, query_emb) in enumerate(zip(query_texts, query_embeddings)):
with Timer() as t:
pipeline.search(query_text, query_embedding=query_emb, top_k=10)
search_times.append(t.elapsed * 1000)
# Sample memory periodically during search
if (i + 1) % 20 == 0:
search_mem_profiler.sample()
print(f"Completed {i + 1}/{num_queries} queries...")
search_peak_rss_mb = search_mem_profiler.get_peak_rss_mb()
# Overall peak RSS (maximum of build and search phases)
overall_peak_rss_mb = max(build_peak_rss_mb, search_peak_rss_mb)
# Compute statistics
build_times_sorted = sorted(build_times)
search_times_sorted = sorted(search_times)
results = {
"corpus": corpus_name,
"size": size,
"ef_search": ef_search,
"M": M,
"num_queries": num_queries,
"build_p50_ms": build_times_sorted[len(build_times_sorted) // 2],
"build_p95_ms": build_times_sorted[int(len(build_times_sorted) * 0.95)],
"build_p99_ms": build_times_sorted[int(len(build_times_sorted) * 0.99)],
"search_p50_ms": search_times_sorted[len(search_times_sorted) // 2],
"search_p95_ms": search_times_sorted[int(len(search_times_sorted) * 0.95)],
"search_p99_ms": search_times_sorted[int(len(search_times_sorted) * 0.99)],
"avg_build_time_ms": sum(build_times) / len(build_times),
"avg_search_time_ms": sum(search_times) / len(search_times),
"qps": 1000.0 / (sum(search_times) / len(search_times)) if search_times else 0.0,
# Memory metrics
"peak_rss_mb": overall_peak_rss_mb,
"build_peak_rss_mb": build_peak_rss_mb,
"build_memory_delta_mb": build_memory_delta_mb,
"search_peak_rss_mb": search_peak_rss_mb,
}
print(f"✓ Results: P50={results['search_p50_ms']:.2f}ms, P95={results['search_p95_ms']:.2f}ms, QPS={results['qps']:.2f}, Peak RSS={results['peak_rss_mb']:.2f}MB")
return results
def main():
parser = argparse.ArgumentParser(description="Run benchmarks on real corpora")
parser.add_argument("--corpus", type=str, required=True, help="Corpus name")
parser.add_argument("--corpus-file", type=Path, required=True, help="Corpus JSONL file")
parser.add_argument("--emb-file", type=Path, help="Embeddings .npy file")
parser.add_argument("--sizes", nargs="+", type=str, default=["10k"], help="Corpus sizes (e.g., 10k 50k 100k)")
parser.add_argument("--ef", nargs="+", type=int, default=[50], help="HNSW efSearch values")
parser.add_argument("--M", nargs="+", type=int, default=[16], help="HNSW M values")
parser.add_argument("--num-queries", type=int, default=100, help="Number of queries")
parser.add_argument("--repetitions", type=int, default=5, help="Number of repetitions for variance analysis (default: 5)")
parser.add_argument("--output-dir", type=Path, default=Path("benchmarks/results"), help="Output directory")
args = parser.parse_args()
# Parse sizes
def parse_size(s: str) -> int:
s = s.lower()
if s.endswith("k"):
return int(s[:-1]) * 1000
elif s.endswith("m"):
return int(s[:-1]) * 1000000
return int(s)
sizes = [parse_size(s) for s in args.sizes]
# Create output directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = args.output_dir / args.corpus / timestamp
output_dir.mkdir(parents=True, exist_ok=True)
all_results = []
aggregated_results = []
print(f"\n{'='*70}")
print(f"Running benchmarks with {args.repetitions} repetitions per configuration")
print(f"{'='*70}\n")
# Run benchmarks
for size in sizes:
for ef in args.ef:
for M in args.M:
config_key = f"{size}_{ef}_{M}"
print(f"Configuration: size={size}, ef={ef}, M={M}")
repetition_results = []
for rep in range(args.repetitions):
print(f" Repetition {rep + 1}/{args.repetitions}...", end=" ", flush=True)
result = run_benchmark(
corpus_file=args.corpus_file,
emb_file=args.emb_file,
corpus_name=args.corpus,
size=size,
ef_search=ef,
M=M,
num_queries=args.num_queries,
)
result["repetition"] = rep
repetition_results.append(result)
all_results.append(result)
print("")
# Aggregate across repetitions
aggregated = aggregate_repetitions(repetition_results)
if aggregated:
# Keep original metrics for backward compatibility
for metric in ["search_p50_ms", "search_p95_ms", "search_p99_ms", "qps"]:
if f"{metric}_mean" in aggregated:
aggregated[metric] = aggregated[f"{metric}_mean"]
aggregated_results.append(aggregated)
# Print variance summary
print(f"\n Variance Summary:")
print(f" Search P50: {aggregated.get('search_p50_ms_mean', 0):.2f} ± {aggregated.get('search_p50_ms_std', 0):.2f} ms (CV: {aggregated.get('search_p50_ms_cv', 0):.1f}%)")
print(f" Search P95: {aggregated.get('search_p95_ms_mean', 0):.2f} ± {aggregated.get('search_p95_ms_std', 0):.2f} ms (CV: {aggregated.get('search_p95_ms_cv', 0):.1f}%)")
print(f" QPS: {aggregated.get('qps_mean', 0):.2f} ± {aggregated.get('qps_std', 0):.2f} (CV: {aggregated.get('qps_cv', 0):.1f}%)")
if aggregated.get("is_flaky", False):
print(f" ⚠️ FLAKY: High variance detected in {', '.join(aggregated.get('flaky_metrics', []))}")
print()
# Save detailed results (all repetitions)
results_file = output_dir / "results.json"
with open(results_file, "w") as f:
json.dump(all_results, f, indent=2)
# Save aggregated results with variance statistics
aggregated_file = output_dir / "results_aggregated.json"
with open(aggregated_file, "w") as f:
json.dump(aggregated_results, f, indent=2)
# Save CSV with all repetitions
csv_file = output_dir / "results.csv"
if all_results:
fieldnames = list(all_results[0].keys())
with open(csv_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_results)
# Save aggregated CSV
aggregated_csv_file = output_dir / "results_aggregated.csv"
if aggregated_results:
agg_fieldnames = list(aggregated_results[0].keys())
with open(aggregated_csv_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=agg_fieldnames)
writer.writeheader()
writer.writerows(aggregated_results)
# Print summary
print(f"\n{'='*70}")
print(f"Benchmark Summary")
print(f"{'='*70}")
print(f"Total configurations: {len(aggregated_results)}")
print(f"Total repetitions: {len(all_results)}")
flaky_count = sum(1 for r in aggregated_results if r.get("is_flaky", False))
if flaky_count > 0:
print(f"⚠️ Flaky configurations: {flaky_count}")
print(f"\nResults saved to:")
print(f" - Detailed: {results_file}")
print(f" - Aggregated: {aggregated_file}")
print(f" - CSV: {csv_file}")
print(f" - Aggregated CSV: {aggregated_csv_file}")
print(f"{'='*70}\n")
if __name__ == "__main__":
main()