Adding hash algorithms

This commit is contained in:
Carlos Gutierrez
2025-11-24 17:11:51 -05:00
commit 7416dfed38
25 changed files with 10315 additions and 0 deletions

47
.gitignore vendored Normal file
View File

@@ -0,0 +1,47 @@
# Python
__pycache__/
*.py[cod]
*$py.class
papers/
report.tex
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
.venv/
venv/
ENV/
env/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# OS
.DS_Store
Thumbs.db

22
LICENSE Normal file
View File

@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2025 Carlos Gutierrez
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

230
README.md Normal file
View File

@@ -0,0 +1,230 @@
# MSCS532 Assignment 7: Hash Tables and Their Practical Applications
**Author:** Carlos Gutierrez
**Email:** cgutierrez44833@ucumberlands.edu
**Course:** MSCS532 Data Structures and Algorithms
**Assignment:** Hash Tables and Their Practical Applications
## Overview
This assignment provides a comprehensive study of hash tables, including direct-address tables, hash functions, open addressing, and separate chaining. It includes implementations of various hash table data structures, hash function designs (both good and bad examples), theoretical complexity analysis, empirical benchmarking, test coverage, and reproducible visualization assets.
## Repository Structure
```
MSCS532_Assignment7/
├── docs/
│ ├── hash_function_comparison.png # Hash function performance comparison
│ ├── open_addressing_vs_chaining.png # Collision resolution strategies comparison
│ ├── load_factor_impact.png # Performance at different load factors
│ └── collision_analysis.png # Collision analysis for different hash functions
├── examples/
│ ├── hash_tables_demo.py # Hash table demonstrations
│ └── generate_plots.py # Script to reproduce all plots
├── src/
│ ├── hash_functions.py # Various hash function implementations
│ ├── hash_tables.py # Hash table data structures
│ ├── benchmark.py # Benchmarking utilities
│ └── __init__.py # Package initialization
├── tests/
│ ├── test_hash_functions.py # Tests for hash functions
│ └── test_hash_tables.py # Tests for hash tables
├── papers/ # Reference papers (PDFs)
├── requirements.txt # Python dependencies
├── README.md # Project documentation (this file)
└── REPORT.md # Detailed analysis report
```
## Part 1: Hash Functions and Their Impact
### Implementation
#### Good Hash Functions
* **Division Method:** `h(k) = k mod m`
* Simple and fast
* Requires careful choice of table size (preferably prime)
* **Multiplication Method:** `h(k) = floor(m * (kA mod 1))`
* Good distribution with proper choice of A
* Default A = (√5 - 1)/2 ≈ 0.618
* **Universal Hash Functions:** `h(k) = ((a*k + b) mod p) mod m`
* Minimizes collisions for any set of keys
* Requires random parameters a, b and prime p
* **Polynomial String Hash:** Rolling hash for strings
* Better distribution than simple summation
* Base 31 is commonly used
* **DJB2 Hash:** Popular string hash function
* Known for good distribution properties
#### Bad Hash Functions (Demonstration)
* **Simple String Hash:** Sums character values
* Prone to collisions for similar strings
* Poor distribution
* **Bad Clustering Hash:** Demonstrates clustering behavior
* Causes many collisions and poor performance
### API Highlights
**Hash Functions:**
```python
division_hash(key: int, table_size: int) -> int
multiplication_hash(key: int, table_size: int, A: float) -> int
universal_hash(key: int, table_size: int, a: int, b: int, p: int) -> int
string_hash_polynomial(key: str, table_size: int, base: int) -> int
string_hash_simple(key: str, table_size: int) -> int # BAD EXAMPLE
bad_hash_clustering(key: int, table_size: int) -> int # BAD EXAMPLE
```
## Part 2: Hash Table Data Structures
### Implementation
#### Direct-Address Table
* **File:** `src/hash_tables.py`
* **Operations:** insert, search, delete
* **Time Complexity:** O(1) for all operations
* **Space Complexity:** O(m) where m is the key range
* **Use Case:** When keys are integers in a small known range
#### Open Addressing
* **File:** `src/hash_tables.py`
* **Probe Types:**
* Linear Probing: `h(k,i) = (h'(k) + i) mod m`
* Quadratic Probing: `h(k,i) = (h'(k) + c1*i + c2*i²) mod m`
* Double Hashing: `h(k,i) = (h1(k) + i*h2(k)) mod m`
* **Operations:** insert, search, delete
* **Time Complexity:**
* Best/Average: O(1)
* Worst: O(n) due to clustering
* **Space Complexity:** O(n) where n is number of elements
#### Separate Chaining
* **File:** `src/hash_tables.py`
* **Implementation:** Each bucket contains a linked list
* **Operations:** insert, search, delete
* **Time Complexity:**
* Best/Average: O(1)
* Worst: O(n) if all keys hash to same bucket
* **Space Complexity:** O(n + m) where m is table size
### Theoretical Performance Analysis
| Operation | Direct-Address | Open Addressing | Separate Chaining |
|-----------|---------------|-----------------|-------------------|
| Search | O(1) | O(1) avg, O(n) worst | O(1) avg, O(n) worst |
| Insert | O(1) | O(1) avg, O(n) worst | O(1) avg, O(n) worst |
| Delete | O(1) | O(1) avg, O(n) worst | O(1) avg, O(n) worst |
| Space | O(m) | O(n) | O(n + m) |
**Key Insights:**
* **Direct-Address:** Fastest but requires keys in known range
* **Open Addressing:** Better cache performance, but clustering can degrade performance
* **Separate Chaining:** More robust to high load factors, easier deletion
## Getting Started
### Prerequisites
* Python 3.10 or later
* Recommended to use a virtual environment
### Installation
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
pip install -r requirements.txt
```
## Running the Examples
```bash
python examples/hash_tables_demo.py # Hash table demonstrations
python examples/generate_plots.py # Regenerate all figures in docs/
```
## Running Tests
```bash
python -m pytest
```
The test suite verifies correctness for:
* All hash function implementations
* Direct-address table
* Open addressing (all probe types)
* Separate chaining
## Reproducing the Empirical Study
1. Activate your environment and install dependencies.
2. Run `python examples/generate_plots.py`.
* Benchmarks may take several minutes depending on hardware.
3. Generated figures will be written to the `docs/` directory.
## Visualizations
The following visualizations demonstrate key findings from our empirical analysis:
### Hash Function Comparison
![Hash Function Comparison](docs/hash_function_comparison.png)
This visualization compares different hash functions across multiple metrics: collision rates, distribution variance, execution time, and maximum chain lengths. It clearly demonstrates the impact of hash function design on performance.
### Open Addressing vs. Separate Chaining
![Open Addressing vs. Separate Chaining](docs/open_addressing_vs_chaining.png)
This comprehensive comparison shows insert, search, and delete performance across different data sizes for both open addressing (linear, quadratic, double hashing) and separate chaining methods.
### Load Factor Impact
![Load Factor Impact](docs/load_factor_impact.png)
Performance curves demonstrating how operations degrade as load factor increases, highlighting the importance of maintaining appropriate load factors for optimal performance.
### Collision Analysis
![Collision Analysis](docs/collision_analysis.png)
Detailed comparison of collision behavior, clearly demonstrating the dramatic difference between well-designed and poorly-designed hash functions.
## Practical Applications
### Hash Tables
* **Database Systems:** Indexing, join operations
* **Caching:** Memoization, LRU caches
* **Symbol Tables:** Compilers, interpreters
* **Distributed Systems:** Consistent hashing for load balancing
* **Cryptography:** Hash-based data structures, digital signatures
### Hash Functions
* **Data Integrity:** Checksums, hash-based message authentication
* **Load Balancing:** Consistent hashing in distributed systems
* **Bloom Filters:** Probabilistic data structures
* **Database Indexing:** Hash indexes for fast lookups
## Academic Integrity Statement
This project is submitted for academic evaluation in MSCS532 Data Structures and Algorithms. All code, analysis, and documentation were authored by Carlos Gutierrez for the specific purpose of this assignment.
## References
All references are cited in REPORT.md using APA 7th edition format. Reference papers are located in the `papers/` directory.

BIN
docs/collision_analysis.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 137 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 326 KiB

BIN
docs/load_factor_impact.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 348 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 513 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 713 KiB

389
examples/generate_plots.py Normal file
View File

@@ -0,0 +1,389 @@
"""
Generate visualization plots for hash table performance analysis.
"""
import sys
import os
import matplotlib.pyplot as plt
import numpy as np
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.benchmark import (
benchmark_hash_functions,
benchmark_open_addressing_vs_chaining,
benchmark_load_factor_impact,
benchmark_load_factor_impact_probes,
generate_test_data
)
from src.hash_functions import (
division_hash,
multiplication_hash,
string_hash_simple,
string_hash_polynomial,
string_hash_djb2,
bad_hash_clustering
)
def plot_hash_function_comparison():
"""Compare different hash functions."""
print("Generating hash function comparison plot...")
keys = generate_test_data(1000)
table_size = 100
hash_funcs = {
'Division': division_hash,
'Multiplication': lambda k, s: multiplication_hash(k, s),
'Simple String': lambda k, s: string_hash_simple(str(k), s),
'Polynomial String': lambda k, s: string_hash_polynomial(str(k), s),
'DJB2': lambda k, s: string_hash_djb2(str(k), s),
'Bad Clustering': bad_hash_clustering,
}
results = benchmark_hash_functions(hash_funcs, keys, table_size)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Hash Function Performance Comparison', fontsize=16, fontweight='bold')
names = list(results.keys())
collision_rates = [results[n]['collision_rate'] * 100 for n in names]
variances = [results[n]['variance'] for n in names]
times = [results[n]['time'] * 1000 for n in names] # Convert to ms
max_chains = [results[n]['max_chain_length'] for n in names]
# Collision rate
axes[0, 0].bar(names, collision_rates, color='steelblue')
axes[0, 0].set_title('Collision Rate (%)', fontweight='bold')
axes[0, 0].set_ylabel('Collision Rate (%)')
axes[0, 0].tick_params(axis='x', rotation=45)
axes[0, 0].grid(axis='y', alpha=0.3)
# Variance (distribution quality)
axes[0, 1].bar(names, variances, color='coral')
axes[0, 1].set_title('Distribution Variance (Lower is Better)', fontweight='bold')
axes[0, 1].set_ylabel('Variance')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].grid(axis='y', alpha=0.3)
# Execution time
axes[1, 0].bar(names, times, color='mediumseagreen')
axes[1, 0].set_title('Execution Time', fontweight='bold')
axes[1, 0].set_ylabel('Time (ms)')
axes[1, 0].tick_params(axis='x', rotation=45)
axes[1, 0].grid(axis='y', alpha=0.3)
# Max chain length
axes[1, 1].bar(names, max_chains, color='plum')
axes[1, 1].set_title('Maximum Chain Length', fontweight='bold')
axes[1, 1].set_ylabel('Max Chain Length')
axes[1, 1].tick_params(axis='x', rotation=45)
axes[1, 1].grid(axis='y', alpha=0.3)
plt.tight_layout()
output_path = os.path.join(os.path.dirname(__file__), '..', 'docs', 'hash_function_comparison.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f"Saved: {output_path}")
plt.close()
def plot_open_addressing_vs_chaining():
"""Compare open addressing vs separate chaining."""
print("Generating open addressing vs separate chaining comparison plot...")
sizes = [100, 500, 1000, 5000, 10000]
results = benchmark_open_addressing_vs_chaining(sizes)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Open Addressing vs Separate Chaining Performance', fontsize=16, fontweight='bold')
sizes_arr = np.array(sizes)
# Insert time
ax = axes[0, 0]
for probe_type in ['linear', 'quadratic', 'double']:
insert_times = [r['insert_time'] for r in results['open_addressing'][probe_type]]
ax.plot(sizes_arr, insert_times, marker='o', label=f'Open Addressing ({probe_type})', linewidth=2)
insert_times_sc = [r['insert_time'] for r in results['separate_chaining']]
ax.plot(sizes_arr, insert_times_sc, marker='s', label='Separate Chaining', linewidth=2, linestyle='--')
ax.set_xlabel('Number of Elements')
ax.set_ylabel('Insert Time (seconds)')
ax.set_title('Insert Performance', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
ax.set_xscale('log')
ax.set_yscale('log')
# Search time
ax = axes[0, 1]
for probe_type in ['linear', 'quadratic', 'double']:
search_times = [r['search_time'] for r in results['open_addressing'][probe_type]]
ax.plot(sizes_arr, search_times, marker='o', label=f'Open Addressing ({probe_type})', linewidth=2)
search_times_sc = [r['search_time'] for r in results['separate_chaining']]
ax.plot(sizes_arr, search_times_sc, marker='s', label='Separate Chaining', linewidth=2, linestyle='--')
ax.set_xlabel('Number of Elements')
ax.set_ylabel('Search Time (seconds)')
ax.set_title('Search Performance', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
ax.set_xscale('log')
ax.set_yscale('log')
# Delete time
ax = axes[1, 0]
for probe_type in ['linear', 'quadratic', 'double']:
delete_times = [r['delete_time'] for r in results['open_addressing'][probe_type]]
ax.plot(sizes_arr, delete_times, marker='o', label=f'Open Addressing ({probe_type})', linewidth=2)
delete_times_sc = [r['delete_time'] for r in results['separate_chaining']]
ax.plot(sizes_arr, delete_times_sc, marker='s', label='Separate Chaining', linewidth=2, linestyle='--')
ax.set_xlabel('Number of Elements')
ax.set_ylabel('Delete Time (seconds)')
ax.set_title('Delete Performance', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
ax.set_xscale('log')
ax.set_yscale('log')
# Load factors
ax = axes[1, 1]
for probe_type in ['linear', 'quadratic', 'double']:
load_factors = [r['load_factor'] for r in results['open_addressing'][probe_type]]
ax.plot(sizes_arr, load_factors, marker='o', label=f'Open Addressing ({probe_type})', linewidth=2)
load_factors_sc = [r['load_factor'] for r in results['separate_chaining']]
ax.plot(sizes_arr, load_factors_sc, marker='s', label='Separate Chaining', linewidth=2, linestyle='--')
ax.set_xlabel('Number of Elements')
ax.set_ylabel('Load Factor')
ax.set_title('Load Factor', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
ax.set_xscale('log')
plt.tight_layout()
output_path = os.path.join(os.path.dirname(__file__), '..', 'docs', 'open_addressing_vs_chaining.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f"Saved: {output_path}")
plt.close()
def plot_load_factor_impact():
"""Plot performance at different load factors with statistical smoothing."""
print("Generating load factor impact plot...")
results = benchmark_load_factor_impact(initial_size=100, max_elements=1000, probe_type='linear', num_runs=30)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle('Performance Impact of Load Factor', fontsize=16, fontweight='bold')
# Extract data
oa_data = results['open_addressing']
sc_data = results['separate_chaining']
# Sort by load factor to avoid zig-zag lines
oa_sorted = sorted(oa_data, key=lambda x: x['load_factor'])
sc_sorted = sorted(sc_data, key=lambda x: x['load_factor'])
oa_load_factors = [r['load_factor'] for r in oa_sorted]
oa_insert_times = [r['insert_time'] for r in oa_sorted]
oa_insert_stds = [r.get('insert_time_std', 0) for r in oa_sorted]
oa_search_times = [r['search_time'] for r in oa_sorted]
oa_search_stds = [r.get('search_time_std', 0) for r in oa_sorted]
sc_load_factors = [r['load_factor'] for r in sc_sorted]
sc_insert_times = [r['insert_time'] for r in sc_sorted]
sc_insert_stds = [r.get('insert_time_std', 0) for r in sc_sorted]
sc_search_times = [r['search_time'] for r in sc_sorted]
sc_search_stds = [r.get('search_time_std', 0) for r in sc_sorted]
sc_chain_lengths = [r['avg_chain_length'] for r in sc_sorted]
# Insert time vs load factor (per element) with error bars
ax = axes[0]
ax.errorbar(oa_load_factors, oa_insert_times, yerr=oa_insert_stds,
marker='o', label='Open Addressing (Linear)', linewidth=2,
capsize=3, capthick=1.5, alpha=0.8)
ax.errorbar(sc_load_factors, sc_insert_times, yerr=sc_insert_stds,
marker='s', label='Separate Chaining', linewidth=2, linestyle='--',
capsize=3, capthick=1.5, alpha=0.8)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Insert Time per Element (seconds)')
ax.set_title('Insert Time vs Load Factor', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
# Search time vs load factor (per element) with error bars
ax = axes[1]
ax.errorbar(oa_load_factors, oa_search_times, yerr=oa_search_stds,
marker='o', label='Open Addressing (Linear)', linewidth=2,
capsize=3, capthick=1.5, alpha=0.8)
ax.errorbar(sc_load_factors, sc_search_times, yerr=sc_search_stds,
marker='s', label='Separate Chaining', linewidth=2, linestyle='--',
capsize=3, capthick=1.5, alpha=0.8)
ax2 = ax.twinx()
# Chain length is smooth and accurate, so use line plot
ax2.plot(sc_load_factors, sc_chain_lengths, marker='^',
label='Avg Chain Length (SC)', color='green', linestyle=':', linewidth=2)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Search Time per Element (seconds)', color='blue')
ax2.set_ylabel('Average Chain Length', color='green')
ax.set_title('Search Time vs Load Factor', fontweight='bold')
ax.legend(loc='upper left')
ax2.legend(loc='upper right')
ax.grid(alpha=0.3)
plt.tight_layout()
output_path = os.path.join(os.path.dirname(__file__), '..', 'docs', 'load_factor_impact.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f"Saved: {output_path}")
plt.close()
def plot_load_factor_impact_probes():
"""Plot probe counts and comparisons at different load factors.
Uses deterministic metrics (probe counts, comparisons) instead of timing
to produce smooth theoretical curves without measurement noise.
"""
print("Generating load factor impact plot (probe counts)...")
results = benchmark_load_factor_impact_probes(initial_size=100, max_elements=1000, probe_type='linear', num_runs=10)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Performance Impact of Load Factor (Probe Counts & Comparisons)', fontsize=16, fontweight='bold')
# Extract data
oa_data = results['open_addressing']
sc_data = results['separate_chaining']
# Sort by load factor
oa_sorted = sorted(oa_data, key=lambda x: x['load_factor'])
sc_sorted = sorted(sc_data, key=lambda x: x['load_factor'])
oa_load_factors = [r['load_factor'] for r in oa_sorted]
oa_insert_probes = [r['insert_probes_per_element'] for r in oa_sorted]
oa_search_probes = [r['search_probes_per_element'] for r in oa_sorted]
oa_insert_comparisons = [r['insert_comparisons_per_element'] for r in oa_sorted]
oa_search_comparisons = [r['search_comparisons_per_element'] for r in oa_sorted]
sc_load_factors = [r['load_factor'] for r in sc_sorted]
sc_insert_comparisons = [r['insert_comparisons_per_element'] for r in sc_sorted]
sc_search_comparisons = [r['search_comparisons_per_element'] for r in sc_sorted]
sc_chain_lengths = [r['avg_chain_length'] for r in sc_sorted]
# Insert probes per element (Open Addressing)
ax = axes[0, 0]
ax.plot(oa_load_factors, oa_insert_probes, marker='o', label='Open Addressing (Linear)',
linewidth=2, color='blue', markersize=6)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Probes per Element')
ax.set_title('Insert: Probes per Element (Open Addressing)', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
# Search probes per element (Open Addressing)
ax = axes[0, 1]
ax.plot(oa_load_factors, oa_search_probes, marker='o', label='Open Addressing (Linear)',
linewidth=2, color='blue', markersize=6)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Probes per Element')
ax.set_title('Search: Probes per Element (Open Addressing)', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
# Comparisons per element (both methods)
ax = axes[1, 0]
ax.plot(oa_load_factors, oa_insert_comparisons, marker='o', label='Open Addressing (Linear)',
linewidth=2, color='blue', markersize=6)
ax.plot(sc_load_factors, sc_insert_comparisons, marker='s', label='Separate Chaining',
linewidth=2, linestyle='--', color='orange', markersize=6)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Comparisons per Element')
ax.set_title('Insert: Comparisons per Element', fontweight='bold')
ax.legend()
ax.grid(alpha=0.3)
# Search comparisons per element and chain length
ax = axes[1, 1]
ax.plot(oa_load_factors, oa_search_comparisons, marker='o', label='Open Addressing (Linear)',
linewidth=2, color='blue', markersize=6)
ax.plot(sc_load_factors, sc_search_comparisons, marker='s', label='Separate Chaining',
linewidth=2, linestyle='--', color='orange', markersize=6)
ax2 = ax.twinx()
ax2.plot(sc_load_factors, sc_chain_lengths, marker='^', label='Avg Chain Length (SC)',
color='green', linestyle=':', linewidth=2, markersize=6)
ax.set_xlabel('Load Factor')
ax.set_ylabel('Comparisons per Element', color='blue')
ax2.set_ylabel('Average Chain Length', color='green')
ax.set_title('Search: Comparisons per Element', fontweight='bold')
ax.legend(loc='upper left')
ax2.legend(loc='upper right')
ax.grid(alpha=0.3)
plt.tight_layout()
output_path = os.path.join(os.path.dirname(__file__), '..', 'docs', 'load_factor_impact_probes.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f"Saved: {output_path}")
plt.close()
def plot_collision_analysis():
"""Plot collision analysis for different hash functions."""
print("Generating collision analysis plot...")
keys = generate_test_data(500)
table_size = 100
hash_funcs = {
'Division': division_hash,
'Multiplication': lambda k, s: multiplication_hash(k, s),
'Simple String': lambda k, s: string_hash_simple(str(k), s),
'Polynomial': lambda k, s: string_hash_polynomial(str(k), s),
'Bad Clustering': bad_hash_clustering,
}
results = benchmark_hash_functions(hash_funcs, keys, table_size)
fig, ax = plt.subplots(figsize=(12, 6))
names = list(results.keys())
collision_counts = [results[n]['collisions'] for n in names]
colors = ['steelblue' if 'Bad' not in n else 'coral' for n in names]
bars = ax.bar(names, collision_counts, color=colors)
ax.set_xlabel('Hash Function', fontweight='bold')
ax.set_ylabel('Number of Collisions', fontweight='bold')
ax.set_title('Collision Analysis: Good vs Bad Hash Functions', fontsize=14, fontweight='bold')
ax.grid(axis='y', alpha=0.3)
ax.tick_params(axis='x', rotation=45)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontweight='bold')
plt.tight_layout()
output_path = os.path.join(os.path.dirname(__file__), '..', 'docs', 'collision_analysis.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
print(f"Saved: {output_path}")
plt.close()
if __name__ == "__main__":
# Ensure docs directory exists
os.makedirs(os.path.join(os.path.dirname(__file__), '..', 'docs'), exist_ok=True)
print("Generating visualization plots...")
print("This may take a few minutes...\n")
plot_hash_function_comparison()
plot_open_addressing_vs_chaining()
plot_load_factor_impact()
plot_load_factor_impact_probes()
plot_collision_analysis()
print("\nAll plots generated successfully!")

View File

@@ -0,0 +1,207 @@
"""
Demonstration of hash table implementations and their usage.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.hash_tables import (
DirectAddressTable,
HashTableOpenAddressing,
HashTableSeparateChaining
)
from src.hash_functions import (
division_hash,
string_hash_polynomial,
string_hash_simple
)
def demo_direct_address_table():
"""Demonstrate direct-address table."""
print("=" * 60)
print("Direct-Address Table Demonstration")
print("=" * 60)
# Create table for keys in range [0, 99]
table = DirectAddressTable(100)
# Insert some values
table.insert(5, "Alice")
table.insert(42, "Bob")
table.insert(99, "Charlie")
print("\nInserted key-value pairs:")
print(" Key 5 ->", table.search(5))
print(" Key 42 ->", table.search(42))
print(" Key 99 ->", table.search(99))
# Search
print("\nSearching for key 42:", table.search(42))
print("Searching for key 10:", table.search(10)) # Not found
# Delete
table.delete(42)
print("\nAfter deleting key 42:")
print(" Key 42 ->", table.search(42)) # None
print()
def demo_open_addressing():
"""Demonstrate open addressing hash table."""
print("=" * 60)
print("Open Addressing Hash Table Demonstration")
print("=" * 60)
# Test with linear probing
print("\n--- Linear Probing ---")
ht_linear = HashTableOpenAddressing(10, probe_type='linear')
keys = [10, 22, 31, 4, 15, 28, 17, 88, 59]
for key in keys:
ht_linear.insert(key, f"Value_{key}")
print(f"Inserted {len(keys)} keys")
print(f"Load factor: {ht_linear._load_factor():.2f}")
print("\nSearching for keys:")
for key in [10, 22, 88, 99]:
result = ht_linear.search(key)
print(f" Key {key}: {'Found' if result else 'Not found'}")
# Test with quadratic probing
print("\n--- Quadratic Probing ---")
# Use larger table size for quadratic probing to avoid probe sequence issues
ht_quad = HashTableOpenAddressing(20, probe_type='quadratic')
for key in keys:
ht_quad.insert(key, f"Value_{key}")
print(f"Inserted {len(keys)} keys")
print(f"Load factor: {ht_quad._load_factor():.2f}")
# Test with double hashing
print("\n--- Double Hashing ---")
# Use larger table size for double hashing to ensure all keys can be inserted
ht_double = HashTableOpenAddressing(20, probe_type='double')
for key in keys:
ht_double.insert(key, f"Value_{key}")
print(f"Inserted {len(keys)} keys")
print(f"Load factor: {ht_double._load_factor():.2f}")
print()
def demo_separate_chaining():
"""Demonstrate separate chaining hash table."""
print("=" * 60)
print("Separate Chaining Hash Table Demonstration")
print("=" * 60)
ht = HashTableSeparateChaining(10)
keys = [10, 22, 31, 4, 15, 28, 17, 88, 59, 71]
for key in keys:
ht.insert(key, f"Value_{key}")
print(f"\nInserted {len(keys)} keys")
print(f"Load factor: {ht._load_factor():.2f}")
chain_lengths = ht.get_chain_lengths()
print(f"Chain lengths: {chain_lengths}")
print(f"Average chain length: {sum(chain_lengths) / len(chain_lengths):.2f}")
print(f"Maximum chain length: {max(chain_lengths)}")
print("\nSearching for keys:")
for key in [10, 22, 88, 99]:
result = ht.search(key)
print(f" Key {key}: {'Found' if result else 'Not found'}")
# Delete some keys
print("\nDeleting keys 22 and 88:")
ht.delete(22)
ht.delete(88)
print(f" Key 22: {'Found' if ht.search(22) else 'Not found'}")
print(f" Key 88: {'Found' if ht.search(88) else 'Not found'}")
print()
def demo_hash_functions():
"""Demonstrate different hash functions."""
print("=" * 60)
print("Hash Function Demonstration")
print("=" * 60)
keys = [10, 22, 31, 4, 15, 28, 17, 88, 59, 71]
table_size = 11
print(f"\nKeys: {keys}")
print(f"Table size: {table_size}\n")
# Division method
print("Division method (h(k) = k mod m):")
for key in keys[:5]:
hash_val = division_hash(key, table_size)
print(f" h({key}) = {hash_val}")
# String hashing
print("\nString hash functions:")
string_keys = ["hello", "world", "hash", "table", "test"]
print("Simple string hash (BAD - prone to collisions):")
for key in string_keys:
hash_val = string_hash_simple(key, table_size)
print(f" h('{key}') = {hash_val}")
print("\nPolynomial string hash (GOOD - better distribution):")
for key in string_keys:
hash_val = string_hash_polynomial(key, table_size)
print(f" h('{key}') = {hash_val}")
print()
def demo_collision_comparison():
"""Demonstrate collision behavior with different hash functions."""
print("=" * 60)
print("Collision Comparison Demonstration")
print("=" * 60)
# Generate test keys
keys = list(range(100, 200))
table_size = 50
from src.hash_functions import (
division_hash,
multiplication_hash,
string_hash_simple,
string_hash_polynomial
)
hash_funcs = {
'Division': division_hash,
'Multiplication': lambda k, s: multiplication_hash(k, s),
}
print(f"\nTesting with {len(keys)} keys and table size {table_size}\n")
for name, hash_func in hash_funcs.items():
hash_values = [hash_func(k, table_size) for k in keys]
collisions = len(keys) - len(set(hash_values))
collision_rate = collisions / len(keys) * 100
print(f"{name} method:")
print(f" Collisions: {collisions}")
print(f" Collision rate: {collision_rate:.2f}%")
print(f" Buckets used: {len(set(hash_values))}/{table_size}")
print()
if __name__ == "__main__":
demo_direct_address_table()
demo_open_addressing()
demo_separate_chaining()
demo_hash_functions()
demo_collision_comparison()

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
pytest>=7.4.0
matplotlib>=3.7.0
numpy>=1.24.0

40
src/__init__.py Normal file
View File

@@ -0,0 +1,40 @@
"""
Hash Tables and Hash Functions Package
This package provides implementations of various hash table data structures
and hash functions for educational and research purposes.
"""
from .hash_functions import (
division_hash,
multiplication_hash,
universal_hash,
string_hash_simple,
string_hash_polynomial,
string_hash_djb2,
md5_hash,
bad_hash_clustering,
get_hash_function
)
from .hash_tables import (
DirectAddressTable,
HashTableOpenAddressing,
HashTableSeparateChaining
)
__all__ = [
'division_hash',
'multiplication_hash',
'universal_hash',
'string_hash_simple',
'string_hash_polynomial',
'string_hash_djb2',
'md5_hash',
'bad_hash_clustering',
'get_hash_function',
'DirectAddressTable',
'HashTableOpenAddressing',
'HashTableSeparateChaining',
]

570
src/benchmark.py Normal file
View File

@@ -0,0 +1,570 @@
"""
Benchmarking utilities for hash table performance analysis.
"""
import time
import random
import statistics
from typing import List, Dict, Any, Callable, Tuple
from .hash_tables import HashTableOpenAddressing, HashTableSeparateChaining
from .hash_functions import (
division_hash,
multiplication_hash,
string_hash_polynomial,
string_hash_simple,
bad_hash_clustering
)
def benchmark_insert(
hash_table: Any,
keys: List[int],
values: List[Any] = None
) -> float:
"""
Benchmark insertion operations.
Args:
hash_table: Hash table instance
keys: List of keys to insert
values: Optional list of values (defaults to same as keys)
Returns:
Time taken in seconds
"""
if values is None:
values = keys
start = time.perf_counter()
for key, value in zip(keys, values):
hash_table.insert(key, value)
end = time.perf_counter()
return end - start
def benchmark_search(
hash_table: Any,
keys: List[int]
) -> Tuple[float, int]:
"""
Benchmark search operations.
Args:
hash_table: Hash table instance
keys: List of keys to search for
Returns:
Tuple of (time taken in seconds, number of successful searches)
"""
start = time.perf_counter()
found = 0
for key in keys:
if hash_table.search(key) is not None:
found += 1
end = time.perf_counter()
return end - start, found
def benchmark_delete(
hash_table: Any,
keys: List[int]
) -> Tuple[float, int]:
"""
Benchmark delete operations.
Args:
hash_table: Hash table instance
keys: List of keys to delete
Returns:
Tuple of (time taken in seconds, number of successful deletions)
"""
start = time.perf_counter()
deleted = 0
for key in keys:
if hash_table.delete(key):
deleted += 1
end = time.perf_counter()
return end - start, deleted
def generate_test_data(n: int, key_range: Tuple[int, int] = None) -> List[int]:
"""
Generate test data for benchmarking.
Args:
n: Number of keys to generate
key_range: Optional tuple (min, max) for key range
Returns:
List of random keys
"""
if key_range is None:
key_range = (0, n * 10)
random.seed(42) # For reproducibility
return [random.randint(key_range[0], key_range[1]) for _ in range(n)]
def benchmark_hash_functions(
hash_funcs: Dict[str, Callable],
keys: List[int],
table_size: int
) -> Dict[str, Dict[str, Any]]:
"""
Benchmark different hash functions.
Args:
hash_funcs: Dictionary mapping function names to hash functions
keys: List of keys to hash
table_size: Size of hash table
Returns:
Dictionary with benchmark results including collision counts
"""
results = {}
for name, hash_func in hash_funcs.items():
start = time.perf_counter()
hash_values = [hash_func(k, table_size) for k in keys]
end = time.perf_counter()
# Count collisions
collision_count = len(keys) - len(set(hash_values))
collision_rate = collision_count / len(keys) if keys else 0
# Calculate distribution (variance of bucket sizes)
bucket_counts = {}
for hv in hash_values:
bucket_counts[hv] = bucket_counts.get(hv, 0) + 1
bucket_sizes = list(bucket_counts.values())
if bucket_sizes:
avg_bucket_size = sum(bucket_sizes) / len(bucket_sizes)
variance = sum((x - avg_bucket_size) ** 2 for x in bucket_sizes) / len(bucket_sizes)
else:
variance = 0
results[name] = {
'time': end - start,
'collisions': collision_count,
'collision_rate': collision_rate,
'variance': variance,
'buckets_used': len(bucket_counts),
'max_chain_length': max(bucket_sizes) if bucket_sizes else 0
}
return results
def benchmark_open_addressing_vs_chaining(
sizes: List[int],
probe_types: List[str] = ['linear', 'quadratic', 'double']
) -> Dict[str, List[Dict[str, Any]]]:
"""
Compare open addressing (different probe types) vs separate chaining.
Args:
sizes: List of data sizes to test
probe_types: List of probe types to test
Returns:
Dictionary with benchmark results
"""
results = {
'open_addressing': {pt: [] for pt in probe_types},
'separate_chaining': []
}
for size in sizes:
keys = generate_test_data(size)
table_size = int(size * 1.5) # Start with 1.5x load factor
# Test open addressing with different probe types
for probe_type in probe_types:
ht = HashTableOpenAddressing(table_size, probe_type=probe_type)
insert_time = benchmark_insert(ht, keys)
search_time, found = benchmark_search(ht, keys[:size//2])
delete_time, deleted = benchmark_delete(ht, keys[:size//4])
load_factor = ht._load_factor()
results['open_addressing'][probe_type].append({
'size': size,
'insert_time': insert_time,
'search_time': search_time,
'delete_time': delete_time,
'load_factor': load_factor,
'found': found,
'deleted': deleted
})
# Test separate chaining
ht = HashTableSeparateChaining(table_size)
insert_time = benchmark_insert(ht, keys)
search_time, found = benchmark_search(ht, keys[:size//2])
delete_time, deleted = benchmark_delete(ht, keys[:size//4])
chain_lengths = ht.get_chain_lengths()
avg_chain_length = sum(chain_lengths) / len(chain_lengths) if chain_lengths else 0
max_chain_length = max(chain_lengths) if chain_lengths else 0
results['separate_chaining'].append({
'size': size,
'insert_time': insert_time,
'search_time': search_time,
'delete_time': delete_time,
'load_factor': ht._load_factor(),
'found': found,
'deleted': deleted,
'avg_chain_length': avg_chain_length,
'max_chain_length': max_chain_length
})
return results
def benchmark_load_factor_impact(
initial_size: int,
max_elements: int,
probe_type: str = 'linear',
num_runs: int = 5
) -> Dict[str, List[Dict[str, Any]]]:
"""
Benchmark performance at different load factors with multiple runs for statistical accuracy.
Args:
initial_size: Initial hash table size
max_elements: Maximum number of elements to insert
probe_type: Probe type for open addressing
num_runs: Number of runs per load factor for averaging
Returns:
Dictionary with results for open addressing and separate chaining
"""
results = {
'open_addressing': [],
'separate_chaining': []
}
num_samples = 10
batch_size = max_elements // num_samples
# Test open addressing
for i in range(0, max_elements, batch_size):
if i + batch_size > max_elements:
continue
# Run multiple times to get statistical averages
insert_times = []
search_times = []
load_factors = []
for run in range(num_runs):
keys = generate_test_data(max_elements)
ht_oa = HashTableOpenAddressing(initial_size, probe_type=probe_type)
inserted_keys_oa = []
# Insert up to (but not including) current batch
for j in range(0, i, batch_size):
batch_keys = keys[j:j+batch_size]
if not batch_keys:
continue
for key in batch_keys:
ht_oa.insert(key, key)
inserted_keys_oa.extend(batch_keys)
# Measure insert time for this batch (normalized per element)
batch_keys = keys[i:i+batch_size]
if batch_keys:
batch_start = time.perf_counter()
for key in batch_keys:
ht_oa.insert(key, key)
batch_end = time.perf_counter()
insert_time_per_element = (batch_end - batch_start) / len(batch_keys)
insert_times.append(insert_time_per_element)
inserted_keys_oa.extend(batch_keys)
# Benchmark search on a sample of ALL inserted keys
search_sample_size = min(100, len(inserted_keys_oa))
search_keys = inserted_keys_oa[:search_sample_size] if inserted_keys_oa else []
if search_keys:
search_time, _ = benchmark_search(ht_oa, search_keys)
search_time_per_element = search_time / len(search_keys)
search_times.append(search_time_per_element)
load_factors.append(ht_oa._load_factor())
# Compute statistics
if insert_times and search_times:
import statistics
avg_insert = statistics.mean(insert_times)
std_insert = statistics.stdev(insert_times) if len(insert_times) > 1 else 0
avg_search = statistics.mean(search_times)
std_search = statistics.stdev(search_times) if len(search_times) > 1 else 0
avg_load_factor = statistics.mean(load_factors)
results['open_addressing'].append({
'elements': i + batch_size,
'load_factor': avg_load_factor,
'insert_time': avg_insert,
'insert_time_std': std_insert,
'search_time': avg_search,
'search_time_std': std_search
})
# Test separate chaining
for i in range(0, max_elements, batch_size):
if i + batch_size > max_elements:
continue
# Run multiple times to get statistical averages
insert_times = []
search_times = []
load_factors = []
chain_lengths_list = []
for run in range(num_runs):
keys = generate_test_data(max_elements)
ht_sc = HashTableSeparateChaining(initial_size)
inserted_keys_sc = []
# Insert up to (but not including) current batch
for j in range(0, i, batch_size):
batch_keys = keys[j:j+batch_size]
if not batch_keys:
continue
for key in batch_keys:
ht_sc.insert(key, key)
inserted_keys_sc.extend(batch_keys)
# Measure insert time for this batch (normalized per element)
batch_keys = keys[i:i+batch_size]
if batch_keys:
batch_start = time.perf_counter()
for key in batch_keys:
ht_sc.insert(key, key)
batch_end = time.perf_counter()
insert_time_per_element = (batch_end - batch_start) / len(batch_keys)
insert_times.append(insert_time_per_element)
inserted_keys_sc.extend(batch_keys)
# Benchmark search on a sample of ALL inserted keys
search_sample_size = min(100, len(inserted_keys_sc))
search_keys = inserted_keys_sc[:search_sample_size] if inserted_keys_sc else []
if search_keys:
search_time, _ = benchmark_search(ht_sc, search_keys)
search_time_per_element = search_time / len(search_keys)
search_times.append(search_time_per_element)
chain_lengths = ht_sc.get_chain_lengths()
# Calculate average chain length only for non-empty buckets
non_empty_lengths = [l for l in chain_lengths if l > 0]
avg_chain_length = sum(non_empty_lengths) / len(non_empty_lengths) if non_empty_lengths else 0
chain_lengths_list.append(avg_chain_length)
load_factors.append(ht_sc._load_factor())
# Compute statistics
if insert_times and search_times:
avg_insert = statistics.mean(insert_times)
std_insert = statistics.stdev(insert_times) if len(insert_times) > 1 else 0
avg_search = statistics.mean(search_times)
std_search = statistics.stdev(search_times) if len(search_times) > 1 else 0
avg_load_factor = statistics.mean(load_factors)
avg_chain_length = statistics.mean(chain_lengths_list)
results['separate_chaining'].append({
'elements': i + batch_size,
'load_factor': avg_load_factor,
'insert_time': avg_insert,
'insert_time_std': std_insert,
'search_time': avg_search,
'search_time_std': std_search,
'avg_chain_length': avg_chain_length
})
return results
def benchmark_load_factor_impact_probes(
initial_size: int,
max_elements: int,
probe_type: str = 'linear',
num_runs: int = 10
) -> Dict[str, List[Dict[str, Any]]]:
"""
Benchmark probe counts and comparisons at different load factors.
Uses deterministic metrics instead of timing for smooth theoretical curves.
Args:
initial_size: Initial hash table size
max_elements: Maximum number of elements to insert
probe_type: Probe type for open addressing
num_runs: Number of runs per load factor for averaging
Returns:
Dictionary with results for open addressing and separate chaining
"""
results = {
'open_addressing': [],
'separate_chaining': []
}
num_samples = 10
batch_size = max_elements // num_samples
# Test open addressing
for i in range(0, max_elements, batch_size):
if i + batch_size > max_elements:
continue
# Run multiple times to get statistical averages
insert_probes = []
search_probes = []
insert_comparisons = []
search_comparisons = []
load_factors = []
search_sample_size = 100 # Fixed sample size for normalization
for run in range(num_runs):
keys = generate_test_data(max_elements)
ht_oa = HashTableOpenAddressing(initial_size, probe_type=probe_type)
inserted_keys_oa = []
# Insert up to (but not including) current batch
for j in range(0, i, batch_size):
batch_keys = keys[j:j+batch_size]
if not batch_keys:
continue
for key in batch_keys:
ht_oa.insert(key, key)
inserted_keys_oa.extend(batch_keys)
# Reset counters before measuring current batch
ht_oa.reset_counts()
# Measure insert probes/comparisons for this batch
batch_keys = keys[i:i+batch_size]
if batch_keys:
for key in batch_keys:
ht_oa.insert(key, key)
inserted_keys_oa.extend(batch_keys)
insert_probes.append(ht_oa.get_probe_count())
insert_comparisons.append(ht_oa.get_comparison_count())
# Reset counters for search
ht_oa.reset_counts()
# Benchmark search on a sample of ALL inserted keys
actual_search_size = min(search_sample_size, len(inserted_keys_oa))
search_keys = inserted_keys_oa[:actual_search_size] if inserted_keys_oa else []
if search_keys:
for key in search_keys:
ht_oa.search(key)
search_probes.append(ht_oa.get_probe_count())
search_comparisons.append(ht_oa.get_comparison_count())
load_factors.append(ht_oa._load_factor())
# Compute statistics
if insert_probes and search_probes:
# Normalize by batch size and search sample size (fixed at 100)
avg_insert_probes = statistics.mean(insert_probes) / batch_size if insert_probes and batch_size > 0 else 0
avg_search_probes = statistics.mean(search_probes) / search_sample_size if search_probes and search_sample_size > 0 else 0
avg_insert_comparisons = statistics.mean(insert_comparisons) / batch_size if insert_comparisons and batch_size > 0 else 0
avg_search_comparisons = statistics.mean(search_comparisons) / search_sample_size if search_comparisons and search_sample_size > 0 else 0
avg_load_factor = statistics.mean(load_factors)
results['open_addressing'].append({
'elements': i + batch_size,
'load_factor': avg_load_factor,
'insert_probes_per_element': avg_insert_probes,
'search_probes_per_element': avg_search_probes,
'insert_comparisons_per_element': avg_insert_comparisons,
'search_comparisons_per_element': avg_search_comparisons
})
# Test separate chaining
for i in range(0, max_elements, batch_size):
if i + batch_size > max_elements:
continue
# Run multiple times to get statistical averages
insert_comparisons = []
search_comparisons = []
load_factors = []
chain_lengths_list = []
search_sample_size = 100 # Fixed sample size for normalization
for run in range(num_runs):
keys = generate_test_data(max_elements)
ht_sc = HashTableSeparateChaining(initial_size)
inserted_keys_sc = []
# Insert up to (but not including) current batch
for j in range(0, i, batch_size):
batch_keys = keys[j:j+batch_size]
if not batch_keys:
continue
for key in batch_keys:
ht_sc.insert(key, key)
inserted_keys_sc.extend(batch_keys)
# Reset counters before measuring current batch
ht_sc.reset_counts()
# Measure insert comparisons for this batch
batch_keys = keys[i:i+batch_size]
if batch_keys:
for key in batch_keys:
ht_sc.insert(key, key)
inserted_keys_sc.extend(batch_keys)
insert_comparisons.append(ht_sc.get_comparison_count())
# Reset counters for search
ht_sc.reset_counts()
# Benchmark search on a sample of ALL inserted keys
actual_search_size = min(search_sample_size, len(inserted_keys_sc))
search_keys = inserted_keys_sc[:actual_search_size] if inserted_keys_sc else []
if search_keys:
for key in search_keys:
ht_sc.search(key)
search_comparisons.append(ht_sc.get_comparison_count())
chain_lengths = ht_sc.get_chain_lengths()
# Calculate average chain length only for non-empty buckets
non_empty_lengths = [l for l in chain_lengths if l > 0]
avg_chain_length = sum(non_empty_lengths) / len(non_empty_lengths) if non_empty_lengths else 0
chain_lengths_list.append(avg_chain_length)
load_factors.append(ht_sc._load_factor())
# Compute statistics
if insert_comparisons and search_comparisons:
# Normalize by batch size and search sample size (fixed at 100)
avg_insert_comparisons = statistics.mean(insert_comparisons) / batch_size if insert_comparisons and batch_size > 0 else 0
avg_search_comparisons = statistics.mean(search_comparisons) / search_sample_size if search_comparisons and search_sample_size > 0 else 0
avg_load_factor = statistics.mean(load_factors)
avg_chain_length = statistics.mean(chain_lengths_list)
results['separate_chaining'].append({
'elements': i + batch_size,
'load_factor': avg_load_factor,
'insert_comparisons_per_element': avg_insert_comparisons,
'search_comparisons_per_element': avg_search_comparisons,
'avg_chain_length': avg_chain_length
})
return results

183
src/hash_functions.py Normal file
View File

@@ -0,0 +1,183 @@
"""
Hash Functions Module
This module implements various hash functions, including good and bad examples
to demonstrate the impact of hash function design on hash table performance.
"""
import hashlib
from typing import Any, Callable, Optional
def division_hash(key: int, table_size: int) -> int:
"""
Division method hash function: h(k) = k mod m
Simple and fast, but requires careful choice of table size (preferably prime).
Args:
key: The key to hash
table_size: Size of the hash table
Returns:
Hash value in range [0, table_size-1]
"""
return key % table_size
def multiplication_hash(key: int, table_size: int, A: float = 0.6180339887) -> int:
"""
Multiplication method hash function: h(k) = floor(m * (kA mod 1))
A good choice of A (often (sqrt(5)-1)/2) helps distribute keys uniformly.
Args:
key: The key to hash
table_size: Size of the hash table
A: Multiplier constant (default: (sqrt(5)-1)/2)
Returns:
Hash value in range [0, table_size-1]
"""
return int(table_size * ((key * A) % 1))
def universal_hash(key: int, table_size: int, a: int, b: int, p: int) -> int:
"""
Universal hash function: h(k) = ((a*k + b) mod p) mod m
Part of a universal class of hash functions that minimizes collisions
for any set of keys.
Args:
key: The key to hash
table_size: Size of the hash table
a: Random parameter (1 <= a < p)
b: Random parameter (0 <= b < p)
p: Large prime number (p > max_key)
Returns:
Hash value in range [0, table_size-1]
"""
return ((a * key + b) % p) % table_size
def string_hash_simple(key: str, table_size: int) -> int:
"""
Simple string hash function (BAD EXAMPLE - prone to collisions).
This is a naive implementation that sums character values.
Poor distribution for similar strings.
Args:
key: String key to hash
table_size: Size of the hash table
Returns:
Hash value in range [0, table_size-1]
"""
hash_value = 0
for char in key:
hash_value += ord(char)
return hash_value % table_size
def string_hash_polynomial(key: str, table_size: int, base: int = 31) -> int:
"""
Polynomial rolling hash function (GOOD EXAMPLE).
Uses polynomial accumulation: h(s) = (s[0]*b^(n-1) + s[1]*b^(n-2) + ... + s[n-1]) mod m
Better distribution than simple summation.
Args:
key: String key to hash
table_size: Size of the hash table
base: Base for polynomial (default: 31)
Returns:
Hash value in range [0, table_size-1]
"""
hash_value = 0
for char in key:
hash_value = (hash_value * base + ord(char)) % table_size
return hash_value
def string_hash_djb2(key: str, table_size: int) -> int:
"""
DJB2 hash function - a popular string hash function.
Known for good distribution properties.
Args:
key: String key to hash
table_size: Size of the hash table
Returns:
Hash value in range [0, table_size-1]
"""
hash_value = 5381
for char in key:
hash_value = ((hash_value << 5) + hash_value) + ord(char)
return hash_value % table_size
def md5_hash(key: str, table_size: int) -> int:
"""
MD5-based hash function (cryptographically secure but slower).
Provides excellent distribution but computationally expensive.
Demonstrates trade-off between speed and quality.
Args:
key: String key to hash
table_size: Size of the hash table
Returns:
Hash value in range [0, table_size-1]
"""
md5_hash_obj = hashlib.md5(key.encode('utf-8'))
hash_int = int(md5_hash_obj.hexdigest(), 16)
return hash_int % table_size
def bad_hash_clustering(key: int, table_size: int) -> int:
"""
BAD EXAMPLE: Hash function that causes clustering.
This function uses a poor multiplier that causes many collisions
and clustering behavior.
Args:
key: The key to hash
table_size: Size of the hash table
Returns:
Hash value (poorly distributed)
"""
# Poor choice: using table_size as multiplier causes clustering
return (key * table_size) % table_size
def get_hash_function(hash_type: str) -> Callable:
"""
Get a hash function by name.
Args:
hash_type: Type of hash function ('division', 'multiplication', 'universal', etc.)
Returns:
Hash function callable
"""
hash_functions = {
'division': division_hash,
'multiplication': multiplication_hash,
'string_simple': string_hash_simple,
'string_polynomial': string_hash_polynomial,
'string_djb2': string_hash_djb2,
'md5': md5_hash,
'bad_clustering': bad_hash_clustering,
}
return hash_functions.get(hash_type, division_hash)

413
src/hash_tables.py Normal file
View File

@@ -0,0 +1,413 @@
"""
Hash Tables Module
This module implements various hash table data structures including:
- Direct-address tables
- Open addressing (linear probing, quadratic probing, double hashing)
- Separate chaining
"""
from typing import Any, Optional, Tuple, List, Callable
from .hash_functions import division_hash, get_hash_function
class DirectAddressTable:
"""
Direct-address table implementation.
Assumes keys are integers in a small range [0, m-1].
Provides O(1) operations but requires keys to be in a known range.
"""
def __init__(self, size: int):
"""
Initialize direct-address table.
Args:
size: Maximum key value (keys must be in range [0, size-1])
"""
self.size = size
self.table: List[Optional[Any]] = [None] * size
def insert(self, key: int, value: Any) -> None:
"""
Insert key-value pair.
Args:
key: Integer key (must be in range [0, size-1])
value: Value to store
"""
if not (0 <= key < self.size):
raise ValueError(f"Key {key} out of range [0, {self.size-1}]")
self.table[key] = value
def search(self, key: int) -> Optional[Any]:
"""
Search for value by key.
Args:
key: Integer key to search for
Returns:
Value if found, None otherwise
"""
if not (0 <= key < self.size):
return None
return self.table[key]
def delete(self, key: int) -> None:
"""
Delete key-value pair.
Args:
key: Integer key to delete
"""
if 0 <= key < self.size:
self.table[key] = None
class HashTableOpenAddressing:
"""
Hash table using open addressing with multiple probing strategies.
Supports linear probing, quadratic probing, and double hashing.
"""
DELETED = object() # Sentinel for deleted entries
def __init__(
self,
size: int,
hash_func: Optional[Callable] = None,
probe_type: str = 'linear',
load_factor_threshold: float = 0.75
):
"""
Initialize hash table with open addressing.
Args:
size: Initial size of hash table
hash_func: Hash function to use (default: division method)
probe_type: Type of probing ('linear', 'quadratic', 'double')
load_factor_threshold: Maximum load factor before resizing
"""
self.size = size
self.count = 0
self.table: List[Optional[Tuple[int, Any]]] = [None] * size
self.hash_func = hash_func or (lambda k, s: division_hash(k, s))
self.probe_type = probe_type
self.load_factor_threshold = load_factor_threshold
self._probe_count = 0
self._comparison_count = 0
def _load_factor(self) -> float:
"""Calculate current load factor."""
return self.count / self.size
def _linear_probe(self, key: int, i: int) -> int:
"""Linear probing: h(k,i) = (h'(k) + i) mod m"""
h1 = self.hash_func(key, self.size)
return (h1 + i) % self.size
def _quadratic_probe(self, key: int, i: int) -> int:
"""Quadratic probing: h(k,i) = (h'(k) + c1*i + c2*i^2) mod m"""
h1 = self.hash_func(key, self.size)
c1, c2 = 1, 1
return (h1 + c1 * i + c2 * i * i) % self.size
def _double_hash(self, key: int, i: int) -> int:
"""Double hashing: h(k,i) = (h1(k) + i*h2(k)) mod m"""
h1 = self.hash_func(key, self.size)
# Second hash function: h2(k) = 1 + (k mod (m-1))
h2 = 1 + (key % (self.size - 1))
return (h1 + i * h2) % self.size
def _probe(self, key: int, i: int) -> int:
"""Get probe sequence index based on probe type."""
if self.probe_type == 'linear':
return self._linear_probe(key, i)
elif self.probe_type == 'quadratic':
return self._quadratic_probe(key, i)
elif self.probe_type == 'double':
return self._double_hash(key, i)
else:
raise ValueError(f"Unknown probe type: {self.probe_type}")
def _resize(self) -> None:
"""Resize table when load factor exceeds threshold."""
old_table = self.table
old_size = self.size
self.size *= 2
self.count = 0
self.table = [None] * self.size
# Rehash all existing entries
for entry in old_table:
if entry is not None and entry is not self.DELETED:
key, value = entry
self.insert(key, value)
def insert(self, key: int, value: Any) -> None:
"""
Insert key-value pair using open addressing.
Args:
key: Key to insert
value: Value to store
"""
if self._load_factor() >= self.load_factor_threshold:
self._resize()
i = 0
while i < self.size:
index = self._probe(key, i)
self._probe_count += 1
entry = self.table[index]
if entry is None or entry is self.DELETED:
self.table[index] = (key, value)
self.count += 1
return
elif entry[0] == key:
self._comparison_count += 1
# Update existing key
self.table[index] = (key, value)
return
else:
self._comparison_count += 1
i += 1
raise RuntimeError("Hash table is full")
def search(self, key: int) -> Optional[Any]:
"""
Search for value by key.
Args:
key: Key to search for
Returns:
Value if found, None otherwise
"""
i = 0
while i < self.size:
index = self._probe(key, i)
self._probe_count += 1
entry = self.table[index]
if entry is None:
return None
elif entry is not self.DELETED and entry[0] == key:
self._comparison_count += 1
return entry[1]
else:
self._comparison_count += 1
i += 1
return None
def get_probe_count(self) -> int:
"""Get total number of probes performed."""
return self._probe_count
def get_comparison_count(self) -> int:
"""Get total number of key comparisons performed."""
return self._comparison_count
def reset_counts(self) -> None:
"""Reset probe and comparison counters."""
self._probe_count = 0
self._comparison_count = 0
def delete(self, key: int) -> bool:
"""
Delete key-value pair.
Args:
key: Key to delete
Returns:
True if deleted, False if not found
"""
i = 0
while i < self.size:
index = self._probe(key, i)
entry = self.table[index]
if entry is None:
return False
elif entry is not self.DELETED and entry[0] == key:
self.table[index] = self.DELETED
self.count -= 1
return True
i += 1
return False
class HashTableSeparateChaining:
"""
Hash table using separate chaining for collision resolution.
Each bucket contains a linked list of key-value pairs.
"""
class Node:
"""Node for linked list in separate chaining."""
def __init__(self, key: int, value: Any):
self.key = key
self.value = value
self.next: Optional['HashTableSeparateChaining.Node'] = None
def __init__(
self,
size: int,
hash_func: Optional[Callable] = None,
load_factor_threshold: float = 1.0
):
"""
Initialize hash table with separate chaining.
Args:
size: Initial size of hash table
hash_func: Hash function to use (default: division method)
load_factor_threshold: Maximum load factor before resizing
"""
self.size = size
self.count = 0
self.buckets: List[Optional[self.Node]] = [None] * size
self.hash_func = hash_func or (lambda k, s: division_hash(k, s))
self.load_factor_threshold = load_factor_threshold
self._comparison_count = 0
def _load_factor(self) -> float:
"""Calculate current load factor."""
return self.count / self.size
def _resize(self) -> None:
"""Resize table when load factor exceeds threshold."""
old_buckets = self.buckets
old_size = self.size
self.size *= 2
self.count = 0
self.buckets = [None] * self.size
# Rehash all existing entries
for head in old_buckets:
current = head
while current is not None:
self.insert(current.key, current.value)
current = current.next
def insert(self, key: int, value: Any) -> None:
"""
Insert key-value pair.
Args:
key: Key to insert
value: Value to store
"""
if self._load_factor() >= self.load_factor_threshold:
self._resize()
index = self.hash_func(key, self.size)
# Check if key already exists
current = self.buckets[index]
while current is not None:
self._comparison_count += 1
if current.key == key:
current.value = value # Update existing key
return
current = current.next
# Insert new node at head of chain
new_node = self.Node(key, value)
new_node.next = self.buckets[index]
self.buckets[index] = new_node
self.count += 1
def search(self, key: int) -> Optional[Any]:
"""
Search for value by key.
Args:
key: Key to search for
Returns:
Value if found, None otherwise
"""
index = self.hash_func(key, self.size)
current = self.buckets[index]
while current is not None:
self._comparison_count += 1
if current.key == key:
return current.value
current = current.next
return None
def get_comparison_count(self) -> int:
"""Get total number of key comparisons performed."""
return self._comparison_count
def reset_counts(self) -> None:
"""Reset comparison counter."""
self._comparison_count = 0
def delete(self, key: int) -> bool:
"""
Delete key-value pair.
Args:
key: Key to delete
Returns:
True if deleted, False if not found
"""
index = self.hash_func(key, self.size)
current = self.buckets[index]
if current is None:
return False
# Check if key is at head
if current.key == key:
self.buckets[index] = current.next
self.count -= 1
return True
# Search in chain
prev = current
current = current.next
while current is not None:
if current.key == key:
prev.next = current.next
self.count -= 1
return True
prev = current
current = current.next
return False
def get_chain_lengths(self) -> List[int]:
"""
Get lengths of all chains for analysis.
Returns:
List of chain lengths
"""
lengths = []
for head in self.buckets:
length = 0
current = head
while current is not None:
length += 1
current = current.next
lengths.append(length)
return lengths

View File

@@ -0,0 +1,150 @@
"""
Tests for hash functions.
"""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.hash_functions import (
division_hash,
multiplication_hash,
universal_hash,
string_hash_simple,
string_hash_polynomial,
string_hash_djb2,
md5_hash,
bad_hash_clustering
)
class TestDivisionHash:
"""Tests for division hash function."""
def test_basic_division_hash(self):
"""Test basic division hash functionality."""
assert division_hash(10, 7) == 3
assert division_hash(22, 7) == 1
assert division_hash(31, 7) == 3
def test_hash_range(self):
"""Test that hash values are in correct range."""
table_size = 11
for key in range(100):
hash_val = division_hash(key, table_size)
assert 0 <= hash_val < table_size
def test_negative_keys(self):
"""Test handling of negative keys."""
# Division with negative keys
assert division_hash(-10, 7) == (-10 % 7)
class TestMultiplicationHash:
"""Tests for multiplication hash function."""
def test_basic_multiplication_hash(self):
"""Test basic multiplication hash functionality."""
hash_val = multiplication_hash(10, 8)
assert 0 <= hash_val < 8
def test_hash_range(self):
"""Test that hash values are in correct range."""
table_size = 16
for key in range(50):
hash_val = multiplication_hash(key, table_size)
assert 0 <= hash_val < table_size
class TestUniversalHash:
"""Tests for universal hash function."""
def test_basic_universal_hash(self):
"""Test basic universal hash functionality."""
p = 101 # Prime larger than max key
a, b = 3, 7
hash_val = universal_hash(10, 11, a, b, p)
assert 0 <= hash_val < 11
def test_hash_range(self):
"""Test that hash values are in correct range."""
table_size = 13
p = 101
a, b = 5, 11
for key in range(50):
hash_val = universal_hash(key, table_size, a, b, p)
assert 0 <= hash_val < table_size
class TestStringHashFunctions:
"""Tests for string hash functions."""
def test_string_hash_simple(self):
"""Test simple string hash function."""
hash_val = string_hash_simple("hello", 11)
assert 0 <= hash_val < 11
def test_string_hash_polynomial(self):
"""Test polynomial string hash function."""
hash_val = string_hash_polynomial("hello", 11)
assert 0 <= hash_val < 11
def test_string_hash_djb2(self):
"""Test DJB2 string hash function."""
hash_val = string_hash_djb2("hello", 11)
assert 0 <= hash_val < 11
def test_string_hash_collisions(self):
"""Test that different strings can produce different hashes."""
table_size = 100
strings = ["hello", "world", "test", "hash", "table"]
hashes = [string_hash_polynomial(s, table_size) for s in strings]
# At least some should be different (not guaranteed all)
assert len(set(hashes)) > 1
def test_md5_hash(self):
"""Test MD5-based hash function."""
hash_val = md5_hash("test", 11)
assert 0 <= hash_val < 11
class TestBadHashFunctions:
"""Tests for bad hash functions (demonstrating poor behavior)."""
def test_bad_hash_clustering(self):
"""Test bad hash function that causes clustering."""
# This should demonstrate poor distribution
table_size = 10
keys = list(range(20))
hashes = [bad_hash_clustering(k, table_size) for k in keys]
# All hashes should be 0 (demonstrating clustering)
assert all(h == 0 for h in hashes)
class TestHashFunctionProperties:
"""Tests for hash function properties."""
def test_deterministic(self):
"""Test that hash functions are deterministic."""
key = 42
table_size = 11
hash1 = division_hash(key, table_size)
hash2 = division_hash(key, table_size)
assert hash1 == hash2
def test_distribution(self):
"""Test that good hash functions distribute keys reasonably."""
table_size = 20
keys = list(range(100))
hashes = [division_hash(k, table_size) for k in keys]
# Count occurrences in each bucket
bucket_counts = {}
for h in hashes:
bucket_counts[h] = bucket_counts.get(h, 0) + 1
# Most buckets should be used (not perfect, but reasonable)
buckets_used = len(bucket_counts)
assert buckets_used > table_size * 0.5 # At least 50% of buckets used

203
tests/test_hash_tables.py Normal file
View File

@@ -0,0 +1,203 @@
"""
Tests for hash table implementations.
"""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.hash_tables import (
DirectAddressTable,
HashTableOpenAddressing,
HashTableSeparateChaining
)
from src.hash_functions import division_hash
class TestDirectAddressTable:
"""Tests for direct-address table."""
def test_insert_and_search(self):
"""Test basic insert and search operations."""
table = DirectAddressTable(100)
table.insert(5, "value1")
table.insert(42, "value2")
assert table.search(5) == "value1"
assert table.search(42) == "value2"
assert table.search(10) is None
def test_delete(self):
"""Test delete operation."""
table = DirectAddressTable(100)
table.insert(5, "value1")
table.delete(5)
assert table.search(5) is None
def test_out_of_range_key(self):
"""Test handling of out-of-range keys."""
table = DirectAddressTable(100)
with pytest.raises(ValueError):
table.insert(100, "value") # Out of range
assert table.search(100) is None
class TestHashTableOpenAddressing:
"""Tests for open addressing hash table."""
def test_insert_and_search_linear(self):
"""Test insert and search with linear probing."""
ht = HashTableOpenAddressing(10, probe_type='linear')
ht.insert(10, "value1")
ht.insert(22, "value2")
ht.insert(31, "value3")
assert ht.search(10) == "value1"
assert ht.search(22) == "value2"
assert ht.search(31) == "value3"
assert ht.search(99) is None
def test_insert_and_search_quadratic(self):
"""Test insert and search with quadratic probing."""
ht = HashTableOpenAddressing(10, probe_type='quadratic')
ht.insert(10, "value1")
ht.insert(22, "value2")
assert ht.search(10) == "value1"
assert ht.search(22) == "value2"
def test_insert_and_search_double(self):
"""Test insert and search with double hashing."""
ht = HashTableOpenAddressing(10, probe_type='double')
ht.insert(10, "value1")
ht.insert(22, "value2")
assert ht.search(10) == "value1"
assert ht.search(22) == "value2"
def test_delete(self):
"""Test delete operation."""
ht = HashTableOpenAddressing(10, probe_type='linear')
ht.insert(10, "value1")
ht.insert(22, "value2")
assert ht.delete(10) is True
assert ht.search(10) is None
assert ht.search(22) == "value2"
assert ht.delete(99) is False
def test_update_existing_key(self):
"""Test updating an existing key."""
ht = HashTableOpenAddressing(10, probe_type='linear')
ht.insert(10, "value1")
ht.insert(10, "value2") # Update
assert ht.search(10) == "value2"
def test_resize(self):
"""Test automatic resizing."""
ht = HashTableOpenAddressing(5, probe_type='linear', load_factor_threshold=0.7)
# Insert enough to trigger resize
for i in range(10):
ht.insert(i, f"value{i}")
# All should still be searchable
for i in range(10):
assert ht.search(i) == f"value{i}"
class TestHashTableSeparateChaining:
"""Tests for separate chaining hash table."""
def test_insert_and_search(self):
"""Test basic insert and search operations."""
ht = HashTableSeparateChaining(10)
ht.insert(10, "value1")
ht.insert(22, "value2")
ht.insert(31, "value3")
assert ht.search(10) == "value1"
assert ht.search(22) == "value2"
assert ht.search(31) == "value3"
assert ht.search(99) is None
def test_delete(self):
"""Test delete operation."""
ht = HashTableSeparateChaining(10)
ht.insert(10, "value1")
ht.insert(22, "value2")
assert ht.delete(10) is True
assert ht.search(10) is None
assert ht.search(22) == "value2"
assert ht.delete(99) is False
def test_update_existing_key(self):
"""Test updating an existing key."""
ht = HashTableSeparateChaining(10)
ht.insert(10, "value1")
ht.insert(10, "value2") # Update
assert ht.search(10) == "value2"
def test_collision_handling(self):
"""Test that collisions are handled correctly."""
ht = HashTableSeparateChaining(5) # Small table to force collisions
keys = [10, 15, 20, 25, 30]
for key in keys:
ht.insert(key, f"value{key}")
# All should be searchable
for key in keys:
assert ht.search(key) == f"value{key}"
def test_chain_lengths(self):
"""Test chain length reporting."""
ht = HashTableSeparateChaining(5)
for i in range(10):
ht.insert(i, f"value{i}")
chain_lengths = ht.get_chain_lengths()
# After inserting 10 items, table will resize (load factor > 1.0)
# So chain lengths should match current table size, not initial size
assert len(chain_lengths) == ht.size
assert sum(chain_lengths) == 10
def test_resize(self):
"""Test automatic resizing."""
ht = HashTableSeparateChaining(5, load_factor_threshold=1.0)
# Insert enough to trigger resize
for i in range(20):
ht.insert(i, f"value{i}")
# All should still be searchable
for i in range(20):
assert ht.search(i) == f"value{i}"
class TestHashTableComparison:
"""Tests comparing different hash table implementations."""
def test_same_operations_different_implementations(self):
"""Test that different implementations handle same operations."""
keys = [10, 22, 31, 4, 15, 28, 17, 88, 59]
ht_oa = HashTableOpenAddressing(20, probe_type='linear')
ht_sc = HashTableSeparateChaining(20)
# Insert same keys
for key in keys:
ht_oa.insert(key, f"value{key}")
ht_sc.insert(key, f"value{key}")
# Both should find all keys
for key in keys:
assert ht_oa.search(key) == f"value{key}"
assert ht_sc.search(key) == f"value{key}"
# Both should delete successfully
for key in keys[:5]:
assert ht_oa.delete(key) is True
assert ht_sc.delete(key) is True
assert ht_oa.search(key) is None
assert ht_sc.search(key) is None