Initial commit: LLM-DS optimizer framework with data files excluded
This commit is contained in:
272
llmds/indexed_heap.py
Normal file
272
llmds/indexed_heap.py
Normal file
@@ -0,0 +1,272 @@
|
||||
"""Indexed binary heap with decrease/increase-key operations."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class IndexedHeap:
|
||||
"""
|
||||
Indexed binary heap supporting O(log n) decrease/increase-key operations.
|
||||
|
||||
Maintains a heap of (score, id) pairs with an index map for O(1) lookup
|
||||
and O(log n) updates.
|
||||
"""
|
||||
|
||||
def __init__(self, max_heap: bool = False):
|
||||
"""
|
||||
Initialize indexed heap.
|
||||
|
||||
Args:
|
||||
max_heap: If True, use max-heap (largest score at top),
|
||||
otherwise min-heap (smallest score at top)
|
||||
"""
|
||||
self._heap: list[tuple[float, int]] = [] # (score, id)
|
||||
self._pos: dict[int, int] = {} # id -> index in heap
|
||||
self._max_heap = max_heap
|
||||
|
||||
def _compare(self, a: float, b: float) -> bool:
|
||||
"""Compare two scores based on heap type."""
|
||||
if self._max_heap:
|
||||
return a > b
|
||||
return a < b
|
||||
|
||||
def _swap(self, i: int, j: int) -> None:
|
||||
"""Swap elements at indices i and j, updating position map."""
|
||||
self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
|
||||
_, id_i = self._heap[i]
|
||||
_, id_j = self._heap[j]
|
||||
self._pos[id_i] = i
|
||||
self._pos[id_j] = j
|
||||
|
||||
def _bubble_up(self, idx: int) -> None:
|
||||
"""Bubble up element at idx to maintain heap property."""
|
||||
while idx > 0:
|
||||
parent = (idx - 1) // 2
|
||||
score_curr, _ = self._heap[idx]
|
||||
score_parent, _ = self._heap[parent]
|
||||
|
||||
if self._compare(score_curr, score_parent):
|
||||
self._swap(idx, parent)
|
||||
idx = parent
|
||||
else:
|
||||
break
|
||||
|
||||
def _bubble_down(self, idx: int) -> None:
|
||||
"""Bubble down element at idx to maintain heap property."""
|
||||
while True:
|
||||
left = 2 * idx + 1
|
||||
right = 2 * idx + 2
|
||||
best = idx
|
||||
|
||||
if left < len(self._heap):
|
||||
score_best, _ = self._heap[best]
|
||||
score_left, _ = self._heap[left]
|
||||
if self._compare(score_left, score_best):
|
||||
best = left
|
||||
|
||||
if right < len(self._heap):
|
||||
score_best, _ = self._heap[best]
|
||||
score_right, _ = self._heap[right]
|
||||
if self._compare(score_right, score_best):
|
||||
best = right
|
||||
|
||||
if best != idx:
|
||||
self._swap(idx, best)
|
||||
idx = best
|
||||
else:
|
||||
break
|
||||
|
||||
def push(self, key_id: int, score: float) -> None:
|
||||
"""
|
||||
Push an item onto the heap.
|
||||
|
||||
Args:
|
||||
key_id: Unique identifier for the item
|
||||
score: Score/priority value
|
||||
"""
|
||||
if key_id in self._pos:
|
||||
raise ValueError(f"Key {key_id} already exists in heap")
|
||||
|
||||
idx = len(self._heap)
|
||||
self._heap.append((score, key_id))
|
||||
self._pos[key_id] = idx
|
||||
self._bubble_up(idx)
|
||||
|
||||
def pop(self) -> tuple[float, int]:
|
||||
"""
|
||||
Pop the top element from the heap.
|
||||
|
||||
Returns:
|
||||
Tuple of (score, id)
|
||||
|
||||
Raises:
|
||||
IndexError: If heap is empty
|
||||
"""
|
||||
if not self._heap:
|
||||
raise IndexError("Cannot pop from empty heap")
|
||||
|
||||
if len(self._heap) == 1:
|
||||
score, key_id = self._heap.pop()
|
||||
del self._pos[key_id]
|
||||
return score, key_id
|
||||
|
||||
# Swap root with last element
|
||||
self._swap(0, len(self._heap) - 1)
|
||||
score, key_id = self._heap.pop()
|
||||
del self._pos[key_id]
|
||||
|
||||
if self._heap:
|
||||
self._bubble_down(0)
|
||||
|
||||
return score, key_id
|
||||
|
||||
def decrease_key(self, key_id: int, new_score: float) -> None:
|
||||
"""
|
||||
Decrease the key value for an item.
|
||||
|
||||
For min-heap: new_score must be < old_score (bubble up).
|
||||
For max-heap: new_score must be < old_score (bubble down).
|
||||
|
||||
Args:
|
||||
key_id: Item identifier
|
||||
new_score: New score value
|
||||
|
||||
Raises:
|
||||
KeyError: If key_id not found
|
||||
ValueError: If new_score doesn't satisfy heap property
|
||||
"""
|
||||
if key_id not in self._pos:
|
||||
raise KeyError(f"Key {key_id} not found in heap")
|
||||
|
||||
idx = self._pos[key_id]
|
||||
old_score, _ = self._heap[idx]
|
||||
|
||||
# Validate direction - both heap types decrease when new < old
|
||||
if new_score >= old_score:
|
||||
heap_type = "max-heap" if self._max_heap else "min-heap"
|
||||
raise ValueError(f"For {heap_type}, new_score must be < old_score")
|
||||
|
||||
self._heap[idx] = (new_score, key_id)
|
||||
|
||||
# Bubble direction depends on heap type
|
||||
if self._max_heap:
|
||||
# Max-heap: decreasing score means lower priority -> bubble down
|
||||
self._bubble_down(idx)
|
||||
else:
|
||||
# Min-heap: decreasing score means higher priority -> bubble up
|
||||
self._bubble_up(idx)
|
||||
|
||||
def increase_key(self, key_id: int, new_score: float) -> None:
|
||||
"""
|
||||
Increase the key value for an item.
|
||||
|
||||
For min-heap: new_score must be > old_score (bubble down).
|
||||
For max-heap: new_score must be > old_score (bubble up).
|
||||
|
||||
Args:
|
||||
key_id: Item identifier
|
||||
new_score: New score value
|
||||
|
||||
Raises:
|
||||
KeyError: If key_id not found
|
||||
ValueError: If new_score doesn't satisfy heap property
|
||||
"""
|
||||
if key_id not in self._pos:
|
||||
raise KeyError(f"Key {key_id} not found in heap")
|
||||
|
||||
idx = self._pos[key_id]
|
||||
old_score, _ = self._heap[idx]
|
||||
|
||||
# Validate direction - both heap types increase when new > old
|
||||
if new_score <= old_score:
|
||||
heap_type = "max-heap" if self._max_heap else "min-heap"
|
||||
raise ValueError(f"For {heap_type}, new_score must be > old_score")
|
||||
|
||||
self._heap[idx] = (new_score, key_id)
|
||||
|
||||
# Bubble direction depends on heap type
|
||||
if self._max_heap:
|
||||
# Max-heap: increasing score means higher priority -> bubble up
|
||||
self._bubble_up(idx)
|
||||
else:
|
||||
# Min-heap: increasing score means lower priority -> bubble down
|
||||
self._bubble_down(idx)
|
||||
|
||||
def delete(self, key_id: int) -> tuple[float, int]:
|
||||
"""
|
||||
Delete an item from the heap.
|
||||
|
||||
Args:
|
||||
key_id: Item identifier
|
||||
|
||||
Returns:
|
||||
Tuple of (score, id) that was deleted
|
||||
|
||||
Raises:
|
||||
KeyError: If key_id not found
|
||||
"""
|
||||
if key_id not in self._pos:
|
||||
raise KeyError(f"Key {key_id} not found in heap")
|
||||
|
||||
idx = self._pos[key_id]
|
||||
score, _ = self._heap[idx]
|
||||
|
||||
# Swap with last element
|
||||
self._swap(idx, len(self._heap) - 1)
|
||||
self._heap.pop()
|
||||
del self._pos[key_id]
|
||||
|
||||
# Restore heap property
|
||||
if idx < len(self._heap):
|
||||
# Try bubbling up first (might be smaller/bigger than parent)
|
||||
parent = (idx - 1) // 2
|
||||
if idx > 0:
|
||||
score_curr, _ = self._heap[idx]
|
||||
score_parent, _ = self._heap[parent]
|
||||
if self._compare(score_curr, score_parent):
|
||||
self._bubble_up(idx)
|
||||
return score, key_id
|
||||
|
||||
# Otherwise bubble down
|
||||
self._bubble_down(idx)
|
||||
|
||||
return score, key_id
|
||||
|
||||
def peek(self) -> Optional[tuple[float, int]]:
|
||||
"""
|
||||
Peek at the top element without removing it.
|
||||
|
||||
Returns:
|
||||
Tuple of (score, id) or None if empty
|
||||
"""
|
||||
if not self._heap:
|
||||
return None
|
||||
return self._heap[0]
|
||||
|
||||
def get_score(self, key_id: int) -> Optional[float]:
|
||||
"""
|
||||
Get the score for a given key_id.
|
||||
|
||||
Args:
|
||||
key_id: Item identifier
|
||||
|
||||
Returns:
|
||||
Score value or None if not found
|
||||
"""
|
||||
if key_id not in self._pos:
|
||||
return None
|
||||
idx = self._pos[key_id]
|
||||
score, _ = self._heap[idx]
|
||||
return score
|
||||
|
||||
def size(self) -> int:
|
||||
"""Get the number of elements in the heap."""
|
||||
return len(self._heap)
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
"""Check if heap is empty."""
|
||||
return len(self._heap) == 0
|
||||
|
||||
def contains(self, key_id: int) -> bool:
|
||||
"""Check if key_id exists in heap."""
|
||||
return key_id in self._pos
|
||||
|
||||
Reference in New Issue
Block a user