292 lines
10 KiB
Python
292 lines
10 KiB
Python
"""HNSW (Hierarchical Navigable Small World) for approximate nearest neighbor search.
|
|
|
|
Implementation based on:
|
|
Malkov, Y. A., & Yashunin, D. A. (2018). Efficient and robust approximate nearest
|
|
neighbor search using Hierarchical Navigable Small World graphs. IEEE transactions
|
|
on pattern analysis and machine intelligence, 42(4), 824-836.
|
|
|
|
See docs/CITATIONS.md for full citation details.
|
|
"""
|
|
|
|
import random
|
|
from typing import Any, Optional
|
|
|
|
import numpy as np
|
|
|
|
|
|
class HNSW:
|
|
"""
|
|
Hierarchical Navigable Small World graph for approximate nearest neighbor search.
|
|
|
|
Implements HNSW with configurable M, efConstruction, and efSearch parameters.
|
|
|
|
Reference:
|
|
Malkov & Yashunin (2018). Efficient and robust approximate nearest neighbor
|
|
search using Hierarchical Navigable Small World graphs.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
dim: int,
|
|
M: int = 16,
|
|
ef_construction: int = 200,
|
|
ef_search: int = 50,
|
|
ml: float = 1.0 / np.log(2.0),
|
|
seed: Optional[int] = None,
|
|
):
|
|
"""
|
|
Initialize HNSW index.
|
|
|
|
Args:
|
|
dim: Dimension of vectors
|
|
M: Maximum number of connections for each node
|
|
ef_construction: Size of candidate set during construction
|
|
ef_search: Size of candidate set during search
|
|
ml: Normalization factor for level assignment
|
|
seed: Optional random seed for reproducible level assignments.
|
|
If None, uses the global random state.
|
|
"""
|
|
self.dim = dim
|
|
self.M = M
|
|
self.ef_construction = ef_construction
|
|
self.ef_search = ef_search
|
|
self.ml = ml
|
|
|
|
# Instance-level random state for reproducibility
|
|
self._rng = random.Random(seed) if seed is not None else random
|
|
|
|
# Layers: list of graphs, each graph is dict[node_id] -> list[neighbor_ids]
|
|
self._layers: list[dict[int, list[int]]] = []
|
|
self._vectors: dict[int, np.ndarray] = {} # node_id -> vector
|
|
self._max_level: dict[int, int] = {} # node_id -> max level
|
|
self._entry_point: Optional[int] = None
|
|
self._entry_level = 0
|
|
|
|
def _random_level(self) -> int:
|
|
"""Generate random level for new node."""
|
|
level = 0
|
|
while self._rng.random() < np.exp(-self.ml) and level < 10:
|
|
level += 1
|
|
return level
|
|
|
|
def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
|
|
"""Compute L2 distance between two vectors."""
|
|
return float(np.linalg.norm(a - b))
|
|
|
|
def _search_layer(
|
|
self,
|
|
query: np.ndarray,
|
|
k: int,
|
|
entry_points: list[int],
|
|
layer: dict[int, list[int]],
|
|
) -> list[tuple[int, float]]:
|
|
"""
|
|
Search in a single layer using greedy search.
|
|
|
|
Args:
|
|
query: Query vector
|
|
k: Number of results to return
|
|
entry_points: Starting points for search
|
|
layer: Graph layer to search
|
|
|
|
Returns:
|
|
List of (node_id, distance) tuples
|
|
"""
|
|
if not entry_points:
|
|
return []
|
|
|
|
candidates: list[tuple[float, int]] = []
|
|
visited = set(entry_points)
|
|
best_candidates: list[tuple[float, int]] = []
|
|
|
|
# Initialize candidates with entry points
|
|
for ep in entry_points:
|
|
if ep in self._vectors:
|
|
dist = self._distance(query, self._vectors[ep])
|
|
candidates.append((dist, ep))
|
|
best_candidates.append((dist, ep))
|
|
|
|
# Sort by distance
|
|
candidates.sort()
|
|
best_candidates.sort()
|
|
|
|
# Greedy search
|
|
while candidates:
|
|
dist, current = candidates.pop(0)
|
|
|
|
# Explore neighbors
|
|
if current in layer:
|
|
for neighbor in layer[current]:
|
|
if neighbor not in visited:
|
|
visited.add(neighbor)
|
|
if neighbor in self._vectors:
|
|
neighbor_dist = self._distance(query, self._vectors[neighbor])
|
|
candidates.append((neighbor_dist, neighbor))
|
|
best_candidates.append((neighbor_dist, neighbor))
|
|
|
|
# Maintain top-ef_search candidates
|
|
candidates.sort()
|
|
if len(candidates) > self.ef_search:
|
|
candidates = candidates[: self.ef_search]
|
|
|
|
# Sort best candidates and return top-k as (node_id, distance) tuples
|
|
best_candidates.sort()
|
|
results = [(node_id, dist) for dist, node_id in best_candidates[:k]]
|
|
return results
|
|
|
|
def add(self, vec: np.ndarray, vec_id: int) -> None:
|
|
"""
|
|
Add a vector to the index.
|
|
|
|
Args:
|
|
vec: Vector to add (must be of dimension self.dim)
|
|
vec_id: Unique identifier for the vector
|
|
"""
|
|
if vec.shape != (self.dim,):
|
|
raise ValueError(f"Vector dimension mismatch: expected {self.dim}, got {vec.shape[0]}")
|
|
|
|
if vec_id in self._vectors:
|
|
raise ValueError(f"Vector ID {vec_id} already exists")
|
|
|
|
self._vectors[vec_id] = vec.copy()
|
|
level = self._random_level()
|
|
self._max_level[vec_id] = level
|
|
|
|
# Ensure we have enough layers
|
|
while len(self._layers) <= level:
|
|
self._layers.append({})
|
|
|
|
# If this is the first node, set as entry point
|
|
if self._entry_point is None:
|
|
self._entry_point = vec_id
|
|
self._entry_level = level
|
|
for l in range(level + 1):
|
|
self._layers[l][vec_id] = []
|
|
return
|
|
|
|
# Search for nearest neighbors at each level
|
|
entry_points = [self._entry_point]
|
|
|
|
# Start from top layer and work down
|
|
for l in range(min(level, self._entry_level), -1, -1):
|
|
# Search layer for candidates
|
|
candidates = self._search_layer(
|
|
vec, self.ef_construction, entry_points, self._layers[l]
|
|
)
|
|
entry_points = [node_id for node_id, _ in candidates]
|
|
|
|
# Insert at all levels up to node's level
|
|
for l in range(min(level, len(self._layers) - 1) + 1):
|
|
if l == 0:
|
|
# Bottom layer: connect to M neighbors
|
|
candidates = self._search_layer(vec, self.M, entry_points, self._layers[l])
|
|
else:
|
|
# Upper layers: connect to M neighbors
|
|
candidates = self._search_layer(vec, self.M, entry_points, self._layers[l])
|
|
|
|
# Create connections
|
|
neighbors = [node_id for node_id, _ in candidates[: self.M]]
|
|
|
|
if vec_id not in self._layers[l]:
|
|
self._layers[l][vec_id] = []
|
|
|
|
# Add bidirectional connections
|
|
for neighbor in neighbors:
|
|
if neighbor not in self._layers[l]:
|
|
self._layers[l][neighbor] = []
|
|
self._layers[l][vec_id].append(neighbor)
|
|
self._layers[l][neighbor].append(vec_id)
|
|
|
|
# Limit connections to M
|
|
if len(self._layers[l][neighbor]) > self.M:
|
|
# Remove farthest connection
|
|
neighbor_vec = self._vectors[neighbor]
|
|
distances = [
|
|
(self._distance(self._vectors[n], neighbor_vec), n)
|
|
for n in self._layers[l][neighbor]
|
|
]
|
|
distances.sort(reverse=True)
|
|
farthest = distances[0][1]
|
|
self._layers[l][neighbor].remove(farthest)
|
|
if farthest in self._layers[l]:
|
|
self._layers[l][farthest].remove(neighbor)
|
|
|
|
# Limit connections for new node
|
|
if len(self._layers[l][vec_id]) > self.M:
|
|
distances = [
|
|
(self._distance(self._vectors[n], vec), n) for n in self._layers[l][vec_id]
|
|
]
|
|
distances.sort()
|
|
self._layers[l][vec_id] = [n for _, n in distances[: self.M]]
|
|
|
|
entry_points = neighbors
|
|
|
|
# Update entry point if necessary
|
|
if level > self._entry_level:
|
|
self._entry_point = vec_id
|
|
self._entry_level = level
|
|
|
|
def search(self, query: np.ndarray, k: int) -> list[tuple[int, float]]:
|
|
"""
|
|
Search for k nearest neighbors.
|
|
|
|
Args:
|
|
query: Query vector
|
|
k: Number of results to return
|
|
|
|
Returns:
|
|
List of (vector_id, distance) tuples sorted by distance
|
|
"""
|
|
if self._entry_point is None:
|
|
return []
|
|
|
|
if query.shape != (self.dim,):
|
|
raise ValueError(f"Query dimension mismatch: expected {self.dim}, got {query.shape[0]}")
|
|
|
|
# Start from top layer
|
|
current = self._entry_point
|
|
current_level = self._entry_level
|
|
|
|
# Navigate down to level 0
|
|
for l in range(current_level, 0, -1):
|
|
if current not in self._layers[l]:
|
|
continue
|
|
|
|
# Find nearest neighbor in this layer
|
|
neighbors = self._layers[l].get(current, [])
|
|
if not neighbors:
|
|
continue
|
|
|
|
best_dist = self._distance(query, self._vectors[current])
|
|
best_node = current
|
|
|
|
for neighbor in neighbors:
|
|
if neighbor in self._vectors:
|
|
dist = self._distance(query, self._vectors[neighbor])
|
|
if dist < best_dist:
|
|
best_dist = dist
|
|
best_node = neighbor
|
|
|
|
current = best_node
|
|
|
|
# Search layer 0
|
|
results = self._search_layer(query, k, [current], self._layers[0])
|
|
return results
|
|
|
|
def stats(self) -> dict[str, Any]:
|
|
"""
|
|
Get index statistics.
|
|
|
|
Returns:
|
|
Dictionary with index statistics
|
|
"""
|
|
total_edges = sum(sum(len(neighbors) for neighbors in layer.values()) for layer in self._layers)
|
|
return {
|
|
"num_vectors": len(self._vectors),
|
|
"num_layers": len(self._layers),
|
|
"entry_point": self._entry_point,
|
|
"entry_level": self._entry_level,
|
|
"total_edges": total_edges,
|
|
"avg_degree": total_edges / len(self._vectors) if self._vectors else 0.0,
|
|
}
|