136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
"""Admission controller with rate limiting and QPS tracking."""
|
|
|
|
import time
|
|
from collections import deque
|
|
from typing import Optional
|
|
|
|
|
|
class AdmissionController:
|
|
"""
|
|
Admission controller with token-rate limiting and moving-average QPS.
|
|
|
|
Controls admission based on token budget and QPS targets.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
qps_target: float = 10.0,
|
|
token_rate_limit: int = 10000,
|
|
window_size: int = 10,
|
|
):
|
|
"""
|
|
Initialize admission controller.
|
|
|
|
Args:
|
|
qps_target: Target queries per second
|
|
token_rate_limit: Maximum tokens per second
|
|
window_size: Size of moving average window in seconds
|
|
"""
|
|
self.qps_target = qps_target
|
|
self.token_rate_limit = token_rate_limit
|
|
self.window_size = window_size
|
|
self._request_times: deque[float] = deque()
|
|
self._token_history: deque[tuple[float, int]] = deque() # (time, tokens)
|
|
self._admitted_requests = 0
|
|
self._rejected_requests = 0
|
|
|
|
def _cleanup_old_requests(self, current_time: float) -> None:
|
|
"""Remove requests outside the time window."""
|
|
while self._request_times and current_time - self._request_times[0] > self.window_size:
|
|
self._request_times.popleft()
|
|
|
|
while self._token_history and current_time - self._token_history[0][0] > self.window_size:
|
|
self._token_history.popleft()
|
|
|
|
def _get_current_qps(self, current_time: float) -> float:
|
|
"""Calculate current QPS over the window."""
|
|
self._cleanup_old_requests(current_time)
|
|
if not self._request_times:
|
|
return 0.0
|
|
return len(self._request_times) / self.window_size
|
|
|
|
def _get_current_token_rate(self, current_time: float) -> float:
|
|
"""Calculate current token rate over the window."""
|
|
self._cleanup_old_requests(current_time)
|
|
if not self._token_history:
|
|
return 0.0
|
|
|
|
total_tokens = sum(tokens for _, tokens in self._token_history)
|
|
return total_tokens / self.window_size
|
|
|
|
def should_admit(self, estimated_tokens: int = 0) -> tuple[bool, str]:
|
|
"""
|
|
Check if a request should be admitted.
|
|
|
|
Args:
|
|
estimated_tokens: Estimated tokens for this request
|
|
|
|
Returns:
|
|
Tuple of (should_admit, reason)
|
|
"""
|
|
current_time = time.time()
|
|
current_qps = self._get_current_qps(current_time)
|
|
current_token_rate = self._get_current_token_rate(current_time)
|
|
|
|
# Check QPS limit
|
|
if current_qps >= self.qps_target:
|
|
self._rejected_requests += 1
|
|
return False, f"QPS limit exceeded: {current_qps:.2f} >= {self.qps_target}"
|
|
|
|
# Check token rate limit
|
|
if current_token_rate + estimated_tokens / self.window_size > self.token_rate_limit:
|
|
self._rejected_requests += 1
|
|
return False, f"Token rate limit exceeded"
|
|
|
|
# Admit request
|
|
self._request_times.append(current_time)
|
|
if estimated_tokens > 0:
|
|
self._token_history.append((current_time, estimated_tokens))
|
|
self._admitted_requests += 1
|
|
|
|
return True, "admitted"
|
|
|
|
def record_request(self, tokens: int) -> None:
|
|
"""
|
|
Record a completed request with token count.
|
|
|
|
Args:
|
|
tokens: Number of tokens processed
|
|
"""
|
|
current_time = time.time()
|
|
self._token_history.append((current_time, tokens))
|
|
|
|
def stats(self) -> dict[str, float]:
|
|
"""
|
|
Get admission statistics.
|
|
|
|
Returns:
|
|
Dictionary with admission statistics
|
|
"""
|
|
current_time = time.time()
|
|
current_qps = self._get_current_qps(current_time)
|
|
current_token_rate = self._get_current_token_rate(current_time)
|
|
|
|
total_requests = self._admitted_requests + self._rejected_requests
|
|
rejection_rate = (
|
|
self._rejected_requests / total_requests if total_requests > 0 else 0.0
|
|
)
|
|
|
|
return {
|
|
"current_qps": current_qps,
|
|
"target_qps": self.qps_target,
|
|
"current_token_rate": current_token_rate,
|
|
"token_rate_limit": self.token_rate_limit,
|
|
"admitted_requests": self._admitted_requests,
|
|
"rejected_requests": self._rejected_requests,
|
|
"rejection_rate": rejection_rate,
|
|
}
|
|
|
|
def reset(self) -> None:
|
|
"""Reset all statistics."""
|
|
self._request_times.clear()
|
|
self._token_history.clear()
|
|
self._admitted_requests = 0
|
|
self._rejected_requests = 0
|
|
|