356 lines
14 KiB
Python
356 lines
14 KiB
Python
"""Run end-to-end benchmarks on real corpora with variance analysis."""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import random
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from llmds.data_sources.beir_loader import load_beir
|
|
from llmds.data_sources.amazon_reviews import load_amazon_reviews
|
|
from llmds.retrieval_pipeline import RetrievalPipeline
|
|
from llmds.utils import Timer, memory_profiler, calculate_statistics
|
|
|
|
|
|
def aggregate_repetitions(results: list[dict]) -> dict[str, Any]:
|
|
"""
|
|
Aggregate results across repetitions with variance analysis.
|
|
|
|
Args:
|
|
results: List of result dictionaries from multiple repetitions
|
|
|
|
Returns:
|
|
Dictionary with aggregated statistics including variance metrics
|
|
"""
|
|
if not results:
|
|
return {}
|
|
|
|
# Extract metric names (all numeric keys except metadata)
|
|
metadata_keys = {"corpus", "size", "ef_search", "M", "num_queries", "repetition"}
|
|
metric_keys = [k for k in results[0].keys() if k not in metadata_keys]
|
|
|
|
aggregated = {
|
|
"corpus": results[0].get("corpus"),
|
|
"size": results[0].get("size"),
|
|
"ef_search": results[0].get("ef_search"),
|
|
"M": results[0].get("M"),
|
|
"num_queries": results[0].get("num_queries"),
|
|
"repetitions": len(results),
|
|
}
|
|
|
|
# Calculate statistics for each metric
|
|
for metric in metric_keys:
|
|
values = [r.get(metric, 0.0) for r in results if metric in r]
|
|
if values:
|
|
stats_dict = calculate_statistics(values)
|
|
# Store both mean/std and full statistics
|
|
aggregated[f"{metric}_mean"] = stats_dict["mean"]
|
|
aggregated[f"{metric}_std"] = stats_dict["std"]
|
|
aggregated[f"{metric}_min"] = stats_dict["min"]
|
|
aggregated[f"{metric}_max"] = stats_dict["max"]
|
|
aggregated[f"{metric}_ci_lower"] = stats_dict["ci_lower"]
|
|
aggregated[f"{metric}_ci_upper"] = stats_dict["ci_upper"]
|
|
aggregated[f"{metric}_cv"] = stats_dict["cv"] # Coefficient of variation
|
|
|
|
# Identify flaky benchmarks (high variance)
|
|
# Mark as flaky if CV > 20% for critical metrics
|
|
critical_metrics = ["search_p50_ms", "search_p95_ms", "qps"]
|
|
flaky_metrics = []
|
|
for metric in critical_metrics:
|
|
cv_key = f"{metric}_cv"
|
|
if cv_key in aggregated and aggregated[cv_key] > 20.0:
|
|
flaky_metrics.append(metric)
|
|
|
|
aggregated["flaky_metrics"] = flaky_metrics
|
|
aggregated["is_flaky"] = len(flaky_metrics) > 0
|
|
|
|
return aggregated
|
|
|
|
|
|
def load_corpus_sample(corpus_file: Path, size: int, seed: int = 42) -> list[dict]:
|
|
"""Load a sample of documents from corpus."""
|
|
random.seed(seed)
|
|
np.random.seed(seed)
|
|
|
|
all_docs = []
|
|
with open(corpus_file, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
if line.strip():
|
|
all_docs.append(json.loads(line))
|
|
|
|
if len(all_docs) <= size:
|
|
return all_docs
|
|
|
|
# Sample without replacement
|
|
return random.sample(all_docs, size)
|
|
|
|
|
|
def run_benchmark(
|
|
corpus_file: Path,
|
|
emb_file: Path | None,
|
|
corpus_name: str,
|
|
size: int,
|
|
ef_search: int,
|
|
M: int,
|
|
num_queries: int = 100,
|
|
embedding_dim: int = 384,
|
|
) -> dict:
|
|
"""
|
|
Run benchmark on a corpus sample.
|
|
|
|
Returns:
|
|
Dictionary with benchmark results
|
|
"""
|
|
print(f"\n=== Benchmarking {corpus_name} (size={size}, ef={ef_search}, M={M}) ===")
|
|
|
|
# Load corpus sample
|
|
print(f"Loading corpus sample...")
|
|
docs = load_corpus_sample(corpus_file, size)
|
|
print(f"Loaded {len(docs)} documents")
|
|
|
|
# Load or generate embeddings
|
|
if emb_file and emb_file.exists():
|
|
embeddings = np.load(emb_file)
|
|
# Trim to sample size
|
|
embeddings = embeddings[:len(docs)]
|
|
else:
|
|
print("Generating deterministic embeddings...")
|
|
rng = np.random.RandomState(42)
|
|
embeddings = []
|
|
for i in range(len(docs)):
|
|
emb = rng.randn(embedding_dim).astype(np.float32)
|
|
emb = emb / np.linalg.norm(emb)
|
|
embeddings.append(emb)
|
|
embeddings = np.stack(embeddings)
|
|
|
|
# Build pipeline with deterministic seed
|
|
print("Building pipeline...")
|
|
|
|
# Memory profiling for build phase
|
|
with memory_profiler() as mem_profiler:
|
|
pipeline = RetrievalPipeline(
|
|
embedding_dim=embedding_dim,
|
|
hnsw_M=M,
|
|
hnsw_ef_search=ef_search,
|
|
hnsw_ef_construction=ef_search * 4,
|
|
seed=42, # Fixed seed for reproducible HNSW structure
|
|
)
|
|
|
|
# Add documents
|
|
build_times = []
|
|
for i, doc in enumerate(docs):
|
|
with Timer() as t:
|
|
pipeline.add_document(
|
|
doc_id=i,
|
|
text=doc["text"],
|
|
embedding=embeddings[i],
|
|
)
|
|
build_times.append(t.elapsed * 1000)
|
|
# Sample memory periodically during build
|
|
if (i + 1) % (len(docs) // 10 + 1) == 0:
|
|
mem_profiler.sample()
|
|
|
|
build_peak_rss_mb = mem_profiler.get_peak_rss_mb()
|
|
build_memory_delta_mb = mem_profiler.get_memory_delta_mb()
|
|
|
|
# Run queries with memory profiling
|
|
print(f"Running {num_queries} queries...")
|
|
search_times = []
|
|
rng = np.random.RandomState(42)
|
|
|
|
# Generate query embeddings
|
|
query_embeddings = []
|
|
for _ in range(num_queries):
|
|
qemb = rng.randn(embedding_dim).astype(np.float32)
|
|
qemb = qemb / np.linalg.norm(qemb)
|
|
query_embeddings.append(qemb)
|
|
|
|
# Use document texts as queries (simplified)
|
|
query_texts = [docs[i % len(docs)]["text"][:100] for i in range(num_queries)]
|
|
|
|
# Memory profiling for search phase
|
|
with memory_profiler() as search_mem_profiler:
|
|
for i, (query_text, query_emb) in enumerate(zip(query_texts, query_embeddings)):
|
|
with Timer() as t:
|
|
pipeline.search(query_text, query_embedding=query_emb, top_k=10)
|
|
search_times.append(t.elapsed * 1000)
|
|
|
|
# Sample memory periodically during search
|
|
if (i + 1) % 20 == 0:
|
|
search_mem_profiler.sample()
|
|
print(f"Completed {i + 1}/{num_queries} queries...")
|
|
|
|
search_peak_rss_mb = search_mem_profiler.get_peak_rss_mb()
|
|
|
|
# Overall peak RSS (maximum of build and search phases)
|
|
overall_peak_rss_mb = max(build_peak_rss_mb, search_peak_rss_mb)
|
|
|
|
# Compute statistics
|
|
build_times_sorted = sorted(build_times)
|
|
search_times_sorted = sorted(search_times)
|
|
|
|
results = {
|
|
"corpus": corpus_name,
|
|
"size": size,
|
|
"ef_search": ef_search,
|
|
"M": M,
|
|
"num_queries": num_queries,
|
|
"build_p50_ms": build_times_sorted[len(build_times_sorted) // 2],
|
|
"build_p95_ms": build_times_sorted[int(len(build_times_sorted) * 0.95)],
|
|
"build_p99_ms": build_times_sorted[int(len(build_times_sorted) * 0.99)],
|
|
"search_p50_ms": search_times_sorted[len(search_times_sorted) // 2],
|
|
"search_p95_ms": search_times_sorted[int(len(search_times_sorted) * 0.95)],
|
|
"search_p99_ms": search_times_sorted[int(len(search_times_sorted) * 0.99)],
|
|
"avg_build_time_ms": sum(build_times) / len(build_times),
|
|
"avg_search_time_ms": sum(search_times) / len(search_times),
|
|
"qps": 1000.0 / (sum(search_times) / len(search_times)) if search_times else 0.0,
|
|
# Memory metrics
|
|
"peak_rss_mb": overall_peak_rss_mb,
|
|
"build_peak_rss_mb": build_peak_rss_mb,
|
|
"build_memory_delta_mb": build_memory_delta_mb,
|
|
"search_peak_rss_mb": search_peak_rss_mb,
|
|
}
|
|
|
|
print(f"✓ Results: P50={results['search_p50_ms']:.2f}ms, P95={results['search_p95_ms']:.2f}ms, QPS={results['qps']:.2f}, Peak RSS={results['peak_rss_mb']:.2f}MB")
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Run benchmarks on real corpora")
|
|
parser.add_argument("--corpus", type=str, required=True, help="Corpus name")
|
|
parser.add_argument("--corpus-file", type=Path, required=True, help="Corpus JSONL file")
|
|
parser.add_argument("--emb-file", type=Path, help="Embeddings .npy file")
|
|
parser.add_argument("--sizes", nargs="+", type=str, default=["10k"], help="Corpus sizes (e.g., 10k 50k 100k)")
|
|
parser.add_argument("--ef", nargs="+", type=int, default=[50], help="HNSW efSearch values")
|
|
parser.add_argument("--M", nargs="+", type=int, default=[16], help="HNSW M values")
|
|
parser.add_argument("--num-queries", type=int, default=100, help="Number of queries")
|
|
parser.add_argument("--repetitions", type=int, default=5, help="Number of repetitions for variance analysis (default: 5)")
|
|
parser.add_argument("--output-dir", type=Path, default=Path("benchmarks/results"), help="Output directory")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Parse sizes
|
|
def parse_size(s: str) -> int:
|
|
s = s.lower()
|
|
if s.endswith("k"):
|
|
return int(s[:-1]) * 1000
|
|
elif s.endswith("m"):
|
|
return int(s[:-1]) * 1000000
|
|
return int(s)
|
|
|
|
sizes = [parse_size(s) for s in args.sizes]
|
|
|
|
# Create output directory with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_dir = args.output_dir / args.corpus / timestamp
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
all_results = []
|
|
aggregated_results = []
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"Running benchmarks with {args.repetitions} repetitions per configuration")
|
|
print(f"{'='*70}\n")
|
|
|
|
# Run benchmarks
|
|
for size in sizes:
|
|
for ef in args.ef:
|
|
for M in args.M:
|
|
config_key = f"{size}_{ef}_{M}"
|
|
print(f"Configuration: size={size}, ef={ef}, M={M}")
|
|
|
|
repetition_results = []
|
|
for rep in range(args.repetitions):
|
|
print(f" Repetition {rep + 1}/{args.repetitions}...", end=" ", flush=True)
|
|
result = run_benchmark(
|
|
corpus_file=args.corpus_file,
|
|
emb_file=args.emb_file,
|
|
corpus_name=args.corpus,
|
|
size=size,
|
|
ef_search=ef,
|
|
M=M,
|
|
num_queries=args.num_queries,
|
|
)
|
|
result["repetition"] = rep
|
|
repetition_results.append(result)
|
|
all_results.append(result)
|
|
print("✓")
|
|
|
|
# Aggregate across repetitions
|
|
aggregated = aggregate_repetitions(repetition_results)
|
|
if aggregated:
|
|
# Keep original metrics for backward compatibility
|
|
for metric in ["search_p50_ms", "search_p95_ms", "search_p99_ms", "qps"]:
|
|
if f"{metric}_mean" in aggregated:
|
|
aggregated[metric] = aggregated[f"{metric}_mean"]
|
|
|
|
aggregated_results.append(aggregated)
|
|
|
|
# Print variance summary
|
|
print(f"\n Variance Summary:")
|
|
print(f" Search P50: {aggregated.get('search_p50_ms_mean', 0):.2f} ± {aggregated.get('search_p50_ms_std', 0):.2f} ms (CV: {aggregated.get('search_p50_ms_cv', 0):.1f}%)")
|
|
print(f" Search P95: {aggregated.get('search_p95_ms_mean', 0):.2f} ± {aggregated.get('search_p95_ms_std', 0):.2f} ms (CV: {aggregated.get('search_p95_ms_cv', 0):.1f}%)")
|
|
print(f" QPS: {aggregated.get('qps_mean', 0):.2f} ± {aggregated.get('qps_std', 0):.2f} (CV: {aggregated.get('qps_cv', 0):.1f}%)")
|
|
|
|
if aggregated.get("is_flaky", False):
|
|
print(f" ⚠️ FLAKY: High variance detected in {', '.join(aggregated.get('flaky_metrics', []))}")
|
|
print()
|
|
|
|
# Save detailed results (all repetitions)
|
|
results_file = output_dir / "results.json"
|
|
with open(results_file, "w") as f:
|
|
json.dump(all_results, f, indent=2)
|
|
|
|
# Save aggregated results with variance statistics
|
|
aggregated_file = output_dir / "results_aggregated.json"
|
|
with open(aggregated_file, "w") as f:
|
|
json.dump(aggregated_results, f, indent=2)
|
|
|
|
# Save CSV with all repetitions
|
|
csv_file = output_dir / "results.csv"
|
|
if all_results:
|
|
fieldnames = list(all_results[0].keys())
|
|
with open(csv_file, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(all_results)
|
|
|
|
# Save aggregated CSV
|
|
aggregated_csv_file = output_dir / "results_aggregated.csv"
|
|
if aggregated_results:
|
|
agg_fieldnames = list(aggregated_results[0].keys())
|
|
with open(aggregated_csv_file, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=agg_fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(aggregated_results)
|
|
|
|
# Print summary
|
|
print(f"\n{'='*70}")
|
|
print(f"Benchmark Summary")
|
|
print(f"{'='*70}")
|
|
print(f"Total configurations: {len(aggregated_results)}")
|
|
print(f"Total repetitions: {len(all_results)}")
|
|
flaky_count = sum(1 for r in aggregated_results if r.get("is_flaky", False))
|
|
if flaky_count > 0:
|
|
print(f"⚠️ Flaky configurations: {flaky_count}")
|
|
print(f"\nResults saved to:")
|
|
print(f" - Detailed: {results_file}")
|
|
print(f" - Aggregated: {aggregated_file}")
|
|
print(f" - CSV: {csv_file}")
|
|
print(f" - Aggregated CSV: {aggregated_csv_file}")
|
|
print(f"{'='*70}\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|