Files
llm-rag-ds-optimizer/llmds/admissions.py

136 lines
4.5 KiB
Python

"""Admission controller with rate limiting and QPS tracking."""
import time
from collections import deque
from typing import Optional
class AdmissionController:
"""
Admission controller with token-rate limiting and moving-average QPS.
Controls admission based on token budget and QPS targets.
"""
def __init__(
self,
qps_target: float = 10.0,
token_rate_limit: int = 10000,
window_size: int = 10,
):
"""
Initialize admission controller.
Args:
qps_target: Target queries per second
token_rate_limit: Maximum tokens per second
window_size: Size of moving average window in seconds
"""
self.qps_target = qps_target
self.token_rate_limit = token_rate_limit
self.window_size = window_size
self._request_times: deque[float] = deque()
self._token_history: deque[tuple[float, int]] = deque() # (time, tokens)
self._admitted_requests = 0
self._rejected_requests = 0
def _cleanup_old_requests(self, current_time: float) -> None:
"""Remove requests outside the time window."""
while self._request_times and current_time - self._request_times[0] > self.window_size:
self._request_times.popleft()
while self._token_history and current_time - self._token_history[0][0] > self.window_size:
self._token_history.popleft()
def _get_current_qps(self, current_time: float) -> float:
"""Calculate current QPS over the window."""
self._cleanup_old_requests(current_time)
if not self._request_times:
return 0.0
return len(self._request_times) / self.window_size
def _get_current_token_rate(self, current_time: float) -> float:
"""Calculate current token rate over the window."""
self._cleanup_old_requests(current_time)
if not self._token_history:
return 0.0
total_tokens = sum(tokens for _, tokens in self._token_history)
return total_tokens / self.window_size
def should_admit(self, estimated_tokens: int = 0) -> tuple[bool, str]:
"""
Check if a request should be admitted.
Args:
estimated_tokens: Estimated tokens for this request
Returns:
Tuple of (should_admit, reason)
"""
current_time = time.time()
current_qps = self._get_current_qps(current_time)
current_token_rate = self._get_current_token_rate(current_time)
# Check QPS limit
if current_qps >= self.qps_target:
self._rejected_requests += 1
return False, f"QPS limit exceeded: {current_qps:.2f} >= {self.qps_target}"
# Check token rate limit
if current_token_rate + estimated_tokens / self.window_size > self.token_rate_limit:
self._rejected_requests += 1
return False, f"Token rate limit exceeded"
# Admit request
self._request_times.append(current_time)
if estimated_tokens > 0:
self._token_history.append((current_time, estimated_tokens))
self._admitted_requests += 1
return True, "admitted"
def record_request(self, tokens: int) -> None:
"""
Record a completed request with token count.
Args:
tokens: Number of tokens processed
"""
current_time = time.time()
self._token_history.append((current_time, tokens))
def stats(self) -> dict[str, float]:
"""
Get admission statistics.
Returns:
Dictionary with admission statistics
"""
current_time = time.time()
current_qps = self._get_current_qps(current_time)
current_token_rate = self._get_current_token_rate(current_time)
total_requests = self._admitted_requests + self._rejected_requests
rejection_rate = (
self._rejected_requests / total_requests if total_requests > 0 else 0.0
)
return {
"current_qps": current_qps,
"target_qps": self.qps_target,
"current_token_rate": current_token_rate,
"token_rate_limit": self.token_rate_limit,
"admitted_requests": self._admitted_requests,
"rejected_requests": self._rejected_requests,
"rejection_rate": rejection_rate,
}
def reset(self) -> None:
"""Reset all statistics."""
self._request_times.clear()
self._token_history.clear()
self._admitted_requests = 0
self._rejected_requests = 0