Files
llm-rag-ds-optimizer/scripts/profile_tail_latency.py

248 lines
8.3 KiB
Python

"""Profile tail latency breakdown for retrieval pipeline.
This script profiles latency components to identify bottlenecks causing
extreme P99 tail latencies.
"""
import cProfile
import pstats
import statistics
from pathlib import Path
from typing import Dict, List
import numpy as np
from llmds.hnsw import HNSW
from llmds.retrieval_pipeline import RetrievalPipeline
def profile_hnsw_search(num_vectors: int = 10000, dim: int = 128, num_queries: int = 1000):
"""Profile HNSW search operations."""
print(f"Profiling HNSW search with {num_vectors} vectors, dim={dim}, {num_queries} queries...")
np.random.seed(42)
hnsw = HNSW(dim=dim, M=16, ef_construction=200, ef_search=50, seed=42)
# Build index
vectors = []
for i in range(num_vectors):
vec = np.random.randn(dim).astype(np.float32)
vec = vec / np.linalg.norm(vec)
vectors.append(vec)
hnsw.add(vec, i)
# Profile search operations
profiler = cProfile.Profile()
profiler.enable()
search_times = []
for _ in range(num_queries):
query = np.random.randn(dim).astype(np.float32)
query = query / np.linalg.norm(query)
import time
start = time.perf_counter()
results = hnsw.search(query, k=10)
elapsed = time.perf_counter() - start
search_times.append(elapsed * 1000) # Convert to ms
profiler.disable()
# Compute latency statistics
search_times.sort()
p50 = search_times[len(search_times) // 2]
p95 = search_times[int(len(search_times) * 0.95)]
p99 = search_times[int(len(search_times) * 0.99)]
p99_9 = search_times[int(len(search_times) * 0.999)] if len(search_times) >= 1000 else p99
print(f"\nHNSW Search Latency Statistics:")
print(f" P50: {p50:.3f} ms")
print(f" P95: {p95:.3f} ms")
print(f" P99: {p99:.3f} ms")
print(f" P99.9: {p99_9:.3f} ms")
print(f" Mean: {statistics.mean(search_times):.3f} ms")
print(f" Max: {max(search_times):.3f} ms")
# Analyze P99 outliers
threshold = p95 * 2 # Outliers are 2x P95
outliers = [t for t in search_times if t > threshold]
if outliers:
print(f"\n Outliers (>2x P95): {len(outliers)} queries ({len(outliers)/len(search_times)*100:.1f}%)")
print(f" Outlier P50: {statistics.median(outliers):.3f} ms")
print(f" Outlier Max: {max(outliers):.3f} ms")
# Generate profiling report
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
print("\nTop 20 functions by cumulative time:")
print("=" * 80)
stats.print_stats(20)
return {
"p50_ms": p50,
"p95_ms": p95,
"p99_ms": p99,
"p99_9_ms": p99_9,
"mean_ms": statistics.mean(search_times),
"max_ms": max(search_times),
"outlier_count": len(outliers),
"outlier_percent": len(outliers) / len(search_times) * 100 if search_times else 0,
}
def profile_retrieval_pipeline(num_docs: int = 5000, num_queries: int = 500):
"""Profile complete retrieval pipeline."""
print(f"\nProfiling RetrievalPipeline with {num_docs} docs, {num_queries} queries...")
np.random.seed(42)
random = np.random.RandomState(42)
pipeline = RetrievalPipeline(embedding_dim=128, seed=42)
# Build index
for i in range(num_docs):
text = f"document {i} about topic {i % 10}"
embedding = random.randn(128).astype(np.float32)
embedding = embedding / np.linalg.norm(embedding)
pipeline.add_document(doc_id=i, text=text, embedding=embedding)
# Profile search operations
profiler = cProfile.Profile()
profiler.enable()
search_times = []
for _ in range(num_queries):
query_text = "document topic"
query_embedding = random.randn(128).astype(np.float32)
query_embedding = query_embedding / np.linalg.norm(query_embedding)
import time
start = time.perf_counter()
results = pipeline.search(
query_text, query_embedding=query_embedding, top_k=10
)
elapsed = time.perf_counter() - start
search_times.append(elapsed * 1000) # Convert to ms
profiler.disable()
# Compute latency statistics
search_times.sort()
p50 = search_times[len(search_times) // 2]
p95 = search_times[int(len(search_times) * 0.95)]
p99 = search_times[int(len(search_times) * 0.99)]
print(f"\nRetrieval Pipeline Latency Statistics:")
print(f" P50: {p50:.3f} ms")
print(f" P95: {p95:.3f} ms")
print(f" P99: {p99:.3f} ms")
print(f" Mean: {statistics.mean(search_times):.3f} ms")
print(f" Max: {max(search_times):.3f} ms")
# Generate profiling report
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
print("\nTop 20 functions by cumulative time:")
print("=" * 80)
stats.print_stats(20)
return {
"p50_ms": p50,
"p95_ms": p95,
"p99_ms": p99,
"mean_ms": statistics.mean(search_times),
"max_ms": max(search_times),
}
def profile_latency_breakdown(num_vectors: int = 5000, dim: int = 128):
"""Profile latency breakdown by component."""
print(f"\nProfiling latency breakdown with {num_vectors} vectors...")
np.random.seed(42)
hnsw = HNSW(dim=dim, M=16, ef_construction=200, ef_search=50, seed=42)
# Build index
vectors = []
for i in range(num_vectors):
vec = np.random.randn(dim).astype(np.float32)
vec = vec / np.linalg.norm(vec)
vectors.append(vec)
hnsw.add(vec, i)
# Profile individual operations
import time
search_times = []
distance_computation_times = []
for _ in range(100):
query = np.random.randn(dim).astype(np.float32)
query = query / np.linalg.norm(query)
# Profile distance computations
dist_start = time.perf_counter()
distances = [np.linalg.norm(query - vec) for vec in vectors[:100]]
dist_time = (time.perf_counter() - dist_start) * 1000
distance_computation_times.append(dist_time)
# Profile search
search_start = time.perf_counter()
results = hnsw.search(query, k=10)
search_time = (time.perf_counter() - search_start) * 1000
search_times.append(search_time)
print(f"\nLatency Breakdown:")
print(f" Distance computation: {statistics.mean(distance_computation_times):.3f} ms (mean)")
print(f" HNSW search: {statistics.mean(search_times):.3f} ms (mean)")
print(f" Search/Distance ratio: {statistics.mean(search_times) / statistics.mean(distance_computation_times):.2f}x")
def main():
"""Run all profiling tasks."""
import argparse
parser = argparse.ArgumentParser(description="Profile tail latency")
parser.add_argument("--output", type=Path, default=Path("audit/tail_latency_profile.txt"),
help="Output file for profiling report")
parser.add_argument("--num-vectors", type=int, default=10000,
help="Number of vectors for HNSW profiling")
parser.add_argument("--num-docs", type=int, default=5000,
help="Number of documents for pipeline profiling")
parser.add_argument("--num-queries", type=int, default=1000,
help="Number of queries to run")
args = parser.parse_args()
args.output.parent.mkdir(parents=True, exist_ok=True)
# Redirect output to file
import sys
with open(args.output, "w") as f:
sys.stdout = f
try:
# Profile HNSW
hnsw_stats = profile_hnsw_search(args.num_vectors, 128, args.num_queries)
# Profile pipeline
pipeline_stats = profile_retrieval_pipeline(args.num_docs, args.num_queries // 2)
# Breakdown
profile_latency_breakdown(args.num_vectors, 128)
finally:
sys.stdout = sys.__stdout__
print(f"\nProfiling complete. Report saved to: {args.output}")
print(f"\nKey Findings:")
print(f" HNSW P99: {hnsw_stats['p99_ms']:.3f} ms")
print(f" Pipeline P99: {pipeline_stats['p99_ms']:.3f} ms")
if hnsw_stats.get("outlier_count", 0) > 0:
print(f" HNSW Outliers: {hnsw_stats['outlier_count']} ({hnsw_stats['outlier_percent']:.1f}%)")
if __name__ == "__main__":
main()