"""Profile tail latency breakdown for retrieval pipeline. This script profiles latency components to identify bottlenecks causing extreme P99 tail latencies. """ import cProfile import pstats import statistics from pathlib import Path from typing import Dict, List import numpy as np from llmds.hnsw import HNSW from llmds.retrieval_pipeline import RetrievalPipeline def profile_hnsw_search(num_vectors: int = 10000, dim: int = 128, num_queries: int = 1000): """Profile HNSW search operations.""" print(f"Profiling HNSW search with {num_vectors} vectors, dim={dim}, {num_queries} queries...") np.random.seed(42) hnsw = HNSW(dim=dim, M=16, ef_construction=200, ef_search=50, seed=42) # Build index vectors = [] for i in range(num_vectors): vec = np.random.randn(dim).astype(np.float32) vec = vec / np.linalg.norm(vec) vectors.append(vec) hnsw.add(vec, i) # Profile search operations profiler = cProfile.Profile() profiler.enable() search_times = [] for _ in range(num_queries): query = np.random.randn(dim).astype(np.float32) query = query / np.linalg.norm(query) import time start = time.perf_counter() results = hnsw.search(query, k=10) elapsed = time.perf_counter() - start search_times.append(elapsed * 1000) # Convert to ms profiler.disable() # Compute latency statistics search_times.sort() p50 = search_times[len(search_times) // 2] p95 = search_times[int(len(search_times) * 0.95)] p99 = search_times[int(len(search_times) * 0.99)] p99_9 = search_times[int(len(search_times) * 0.999)] if len(search_times) >= 1000 else p99 print(f"\nHNSW Search Latency Statistics:") print(f" P50: {p50:.3f} ms") print(f" P95: {p95:.3f} ms") print(f" P99: {p99:.3f} ms") print(f" P99.9: {p99_9:.3f} ms") print(f" Mean: {statistics.mean(search_times):.3f} ms") print(f" Max: {max(search_times):.3f} ms") # Analyze P99 outliers threshold = p95 * 2 # Outliers are 2x P95 outliers = [t for t in search_times if t > threshold] if outliers: print(f"\n Outliers (>2x P95): {len(outliers)} queries ({len(outliers)/len(search_times)*100:.1f}%)") print(f" Outlier P50: {statistics.median(outliers):.3f} ms") print(f" Outlier Max: {max(outliers):.3f} ms") # Generate profiling report stats = pstats.Stats(profiler) stats.sort_stats("cumulative") print("\nTop 20 functions by cumulative time:") print("=" * 80) stats.print_stats(20) return { "p50_ms": p50, "p95_ms": p95, "p99_ms": p99, "p99_9_ms": p99_9, "mean_ms": statistics.mean(search_times), "max_ms": max(search_times), "outlier_count": len(outliers), "outlier_percent": len(outliers) / len(search_times) * 100 if search_times else 0, } def profile_retrieval_pipeline(num_docs: int = 5000, num_queries: int = 500): """Profile complete retrieval pipeline.""" print(f"\nProfiling RetrievalPipeline with {num_docs} docs, {num_queries} queries...") np.random.seed(42) random = np.random.RandomState(42) pipeline = RetrievalPipeline(embedding_dim=128, seed=42) # Build index for i in range(num_docs): text = f"document {i} about topic {i % 10}" embedding = random.randn(128).astype(np.float32) embedding = embedding / np.linalg.norm(embedding) pipeline.add_document(doc_id=i, text=text, embedding=embedding) # Profile search operations profiler = cProfile.Profile() profiler.enable() search_times = [] for _ in range(num_queries): query_text = "document topic" query_embedding = random.randn(128).astype(np.float32) query_embedding = query_embedding / np.linalg.norm(query_embedding) import time start = time.perf_counter() results = pipeline.search( query_text, query_embedding=query_embedding, top_k=10 ) elapsed = time.perf_counter() - start search_times.append(elapsed * 1000) # Convert to ms profiler.disable() # Compute latency statistics search_times.sort() p50 = search_times[len(search_times) // 2] p95 = search_times[int(len(search_times) * 0.95)] p99 = search_times[int(len(search_times) * 0.99)] print(f"\nRetrieval Pipeline Latency Statistics:") print(f" P50: {p50:.3f} ms") print(f" P95: {p95:.3f} ms") print(f" P99: {p99:.3f} ms") print(f" Mean: {statistics.mean(search_times):.3f} ms") print(f" Max: {max(search_times):.3f} ms") # Generate profiling report stats = pstats.Stats(profiler) stats.sort_stats("cumulative") print("\nTop 20 functions by cumulative time:") print("=" * 80) stats.print_stats(20) return { "p50_ms": p50, "p95_ms": p95, "p99_ms": p99, "mean_ms": statistics.mean(search_times), "max_ms": max(search_times), } def profile_latency_breakdown(num_vectors: int = 5000, dim: int = 128): """Profile latency breakdown by component.""" print(f"\nProfiling latency breakdown with {num_vectors} vectors...") np.random.seed(42) hnsw = HNSW(dim=dim, M=16, ef_construction=200, ef_search=50, seed=42) # Build index vectors = [] for i in range(num_vectors): vec = np.random.randn(dim).astype(np.float32) vec = vec / np.linalg.norm(vec) vectors.append(vec) hnsw.add(vec, i) # Profile individual operations import time search_times = [] distance_computation_times = [] for _ in range(100): query = np.random.randn(dim).astype(np.float32) query = query / np.linalg.norm(query) # Profile distance computations dist_start = time.perf_counter() distances = [np.linalg.norm(query - vec) for vec in vectors[:100]] dist_time = (time.perf_counter() - dist_start) * 1000 distance_computation_times.append(dist_time) # Profile search search_start = time.perf_counter() results = hnsw.search(query, k=10) search_time = (time.perf_counter() - search_start) * 1000 search_times.append(search_time) print(f"\nLatency Breakdown:") print(f" Distance computation: {statistics.mean(distance_computation_times):.3f} ms (mean)") print(f" HNSW search: {statistics.mean(search_times):.3f} ms (mean)") print(f" Search/Distance ratio: {statistics.mean(search_times) / statistics.mean(distance_computation_times):.2f}x") def main(): """Run all profiling tasks.""" import argparse parser = argparse.ArgumentParser(description="Profile tail latency") parser.add_argument("--output", type=Path, default=Path("audit/tail_latency_profile.txt"), help="Output file for profiling report") parser.add_argument("--num-vectors", type=int, default=10000, help="Number of vectors for HNSW profiling") parser.add_argument("--num-docs", type=int, default=5000, help="Number of documents for pipeline profiling") parser.add_argument("--num-queries", type=int, default=1000, help="Number of queries to run") args = parser.parse_args() args.output.parent.mkdir(parents=True, exist_ok=True) # Redirect output to file import sys with open(args.output, "w") as f: sys.stdout = f try: # Profile HNSW hnsw_stats = profile_hnsw_search(args.num_vectors, 128, args.num_queries) # Profile pipeline pipeline_stats = profile_retrieval_pipeline(args.num_docs, args.num_queries // 2) # Breakdown profile_latency_breakdown(args.num_vectors, 128) finally: sys.stdout = sys.__stdout__ print(f"\nProfiling complete. Report saved to: {args.output}") print(f"\nKey Findings:") print(f" HNSW P99: {hnsw_stats['p99_ms']:.3f} ms") print(f" Pipeline P99: {pipeline_stats['p99_ms']:.3f} ms") if hnsw_stats.get("outlier_count", 0) > 0: print(f" HNSW Outliers: {hnsw_stats['outlier_count']} ({hnsw_stats['outlier_percent']:.1f}%)") if __name__ == "__main__": main()