Files

292 lines
10 KiB
Python

"""HNSW (Hierarchical Navigable Small World) for approximate nearest neighbor search.
Implementation based on:
Malkov, Y. A., & Yashunin, D. A. (2018). Efficient and robust approximate nearest
neighbor search using Hierarchical Navigable Small World graphs. IEEE transactions
on pattern analysis and machine intelligence, 42(4), 824-836.
See docs/CITATIONS.md for full citation details.
"""
import random
from typing import Any, Optional
import numpy as np
class HNSW:
"""
Hierarchical Navigable Small World graph for approximate nearest neighbor search.
Implements HNSW with configurable M, efConstruction, and efSearch parameters.
Reference:
Malkov & Yashunin (2018). Efficient and robust approximate nearest neighbor
search using Hierarchical Navigable Small World graphs.
"""
def __init__(
self,
dim: int,
M: int = 16,
ef_construction: int = 200,
ef_search: int = 50,
ml: float = 1.0 / np.log(2.0),
seed: Optional[int] = None,
):
"""
Initialize HNSW index.
Args:
dim: Dimension of vectors
M: Maximum number of connections for each node
ef_construction: Size of candidate set during construction
ef_search: Size of candidate set during search
ml: Normalization factor for level assignment
seed: Optional random seed for reproducible level assignments.
If None, uses the global random state.
"""
self.dim = dim
self.M = M
self.ef_construction = ef_construction
self.ef_search = ef_search
self.ml = ml
# Instance-level random state for reproducibility
self._rng = random.Random(seed) if seed is not None else random
# Layers: list of graphs, each graph is dict[node_id] -> list[neighbor_ids]
self._layers: list[dict[int, list[int]]] = []
self._vectors: dict[int, np.ndarray] = {} # node_id -> vector
self._max_level: dict[int, int] = {} # node_id -> max level
self._entry_point: Optional[int] = None
self._entry_level = 0
def _random_level(self) -> int:
"""Generate random level for new node."""
level = 0
while self._rng.random() < np.exp(-self.ml) and level < 10:
level += 1
return level
def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
"""Compute L2 distance between two vectors."""
return float(np.linalg.norm(a - b))
def _search_layer(
self,
query: np.ndarray,
k: int,
entry_points: list[int],
layer: dict[int, list[int]],
) -> list[tuple[int, float]]:
"""
Search in a single layer using greedy search.
Args:
query: Query vector
k: Number of results to return
entry_points: Starting points for search
layer: Graph layer to search
Returns:
List of (node_id, distance) tuples
"""
if not entry_points:
return []
candidates: list[tuple[float, int]] = []
visited = set(entry_points)
best_candidates: list[tuple[float, int]] = []
# Initialize candidates with entry points
for ep in entry_points:
if ep in self._vectors:
dist = self._distance(query, self._vectors[ep])
candidates.append((dist, ep))
best_candidates.append((dist, ep))
# Sort by distance
candidates.sort()
best_candidates.sort()
# Greedy search
while candidates:
dist, current = candidates.pop(0)
# Explore neighbors
if current in layer:
for neighbor in layer[current]:
if neighbor not in visited:
visited.add(neighbor)
if neighbor in self._vectors:
neighbor_dist = self._distance(query, self._vectors[neighbor])
candidates.append((neighbor_dist, neighbor))
best_candidates.append((neighbor_dist, neighbor))
# Maintain top-ef_search candidates
candidates.sort()
if len(candidates) > self.ef_search:
candidates = candidates[: self.ef_search]
# Sort best candidates and return top-k as (node_id, distance) tuples
best_candidates.sort()
results = [(node_id, dist) for dist, node_id in best_candidates[:k]]
return results
def add(self, vec: np.ndarray, vec_id: int) -> None:
"""
Add a vector to the index.
Args:
vec: Vector to add (must be of dimension self.dim)
vec_id: Unique identifier for the vector
"""
if vec.shape != (self.dim,):
raise ValueError(f"Vector dimension mismatch: expected {self.dim}, got {vec.shape[0]}")
if vec_id in self._vectors:
raise ValueError(f"Vector ID {vec_id} already exists")
self._vectors[vec_id] = vec.copy()
level = self._random_level()
self._max_level[vec_id] = level
# Ensure we have enough layers
while len(self._layers) <= level:
self._layers.append({})
# If this is the first node, set as entry point
if self._entry_point is None:
self._entry_point = vec_id
self._entry_level = level
for l in range(level + 1):
self._layers[l][vec_id] = []
return
# Search for nearest neighbors at each level
entry_points = [self._entry_point]
# Start from top layer and work down
for l in range(min(level, self._entry_level), -1, -1):
# Search layer for candidates
candidates = self._search_layer(
vec, self.ef_construction, entry_points, self._layers[l]
)
entry_points = [node_id for node_id, _ in candidates]
# Insert at all levels up to node's level
for l in range(min(level, len(self._layers) - 1) + 1):
if l == 0:
# Bottom layer: connect to M neighbors
candidates = self._search_layer(vec, self.M, entry_points, self._layers[l])
else:
# Upper layers: connect to M neighbors
candidates = self._search_layer(vec, self.M, entry_points, self._layers[l])
# Create connections
neighbors = [node_id for node_id, _ in candidates[: self.M]]
if vec_id not in self._layers[l]:
self._layers[l][vec_id] = []
# Add bidirectional connections
for neighbor in neighbors:
if neighbor not in self._layers[l]:
self._layers[l][neighbor] = []
self._layers[l][vec_id].append(neighbor)
self._layers[l][neighbor].append(vec_id)
# Limit connections to M
if len(self._layers[l][neighbor]) > self.M:
# Remove farthest connection
neighbor_vec = self._vectors[neighbor]
distances = [
(self._distance(self._vectors[n], neighbor_vec), n)
for n in self._layers[l][neighbor]
]
distances.sort(reverse=True)
farthest = distances[0][1]
self._layers[l][neighbor].remove(farthest)
if farthest in self._layers[l]:
self._layers[l][farthest].remove(neighbor)
# Limit connections for new node
if len(self._layers[l][vec_id]) > self.M:
distances = [
(self._distance(self._vectors[n], vec), n) for n in self._layers[l][vec_id]
]
distances.sort()
self._layers[l][vec_id] = [n for _, n in distances[: self.M]]
entry_points = neighbors
# Update entry point if necessary
if level > self._entry_level:
self._entry_point = vec_id
self._entry_level = level
def search(self, query: np.ndarray, k: int) -> list[tuple[int, float]]:
"""
Search for k nearest neighbors.
Args:
query: Query vector
k: Number of results to return
Returns:
List of (vector_id, distance) tuples sorted by distance
"""
if self._entry_point is None:
return []
if query.shape != (self.dim,):
raise ValueError(f"Query dimension mismatch: expected {self.dim}, got {query.shape[0]}")
# Start from top layer
current = self._entry_point
current_level = self._entry_level
# Navigate down to level 0
for l in range(current_level, 0, -1):
if current not in self._layers[l]:
continue
# Find nearest neighbor in this layer
neighbors = self._layers[l].get(current, [])
if not neighbors:
continue
best_dist = self._distance(query, self._vectors[current])
best_node = current
for neighbor in neighbors:
if neighbor in self._vectors:
dist = self._distance(query, self._vectors[neighbor])
if dist < best_dist:
best_dist = dist
best_node = neighbor
current = best_node
# Search layer 0
results = self._search_layer(query, k, [current], self._layers[0])
return results
def stats(self) -> dict[str, Any]:
"""
Get index statistics.
Returns:
Dictionary with index statistics
"""
total_edges = sum(sum(len(neighbors) for neighbors in layer.values()) for layer in self._layers)
return {
"num_vectors": len(self._vectors),
"num_layers": len(self._layers),
"entry_point": self._entry_point,
"entry_level": self._entry_level,
"total_edges": total_edges,
"avg_degree": total_edges / len(self._vectors) if self._vectors else 0.0,
}