Initial commit: Divide-and-conquer sorting algorithms benchmark

- Implement Merge Sort and Quick Sort algorithms with instrumentation
- Add Quick Sort pivot strategies: first, last, median_of_three, random
- Create dataset generators for 5 dataset types (sorted, reverse, random, nearly_sorted, duplicates_heavy)
- Build comprehensive benchmarking CLI with metrics collection
- Add performance measurement (time, memory, comparisons, swaps)
- Configure logging with rotating file handlers
- Generate plots for time and memory vs size
- Include comprehensive test suite with pytest
- Add full documentation in README.md
This commit is contained in:
Carlos Gutierrez
2025-10-30 21:14:37 -04:00
commit 10570af981
15 changed files with 1518 additions and 0 deletions

54
.gitignore vendored Normal file
View File

@@ -0,0 +1,54 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
env/
ENV/
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# Project specific
results/
plots/
*.log
# OS
.DS_Store
Thumbs.db
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# Type checking
.mypy_cache/
.ruff_cache/

368
README.md Normal file
View File

@@ -0,0 +1,368 @@
# Divide-and-Conquer Sorting Algorithms Benchmark
A comprehensive Python project for benchmarking merge sort and quick sort algorithms across different dataset types and sizes, with detailed performance metrics, logging, and visualization.
## Project Overview
This project implements two divide-and-conquer sorting algorithms (Merge Sort and Quick Sort) and provides a benchmarking framework to evaluate their performance across various dataset characteristics:
- **Merge Sort**: Stable, O(n log n) worst-case time complexity
- **Quick Sort**: In-place, O(n log n) average-case with configurable pivot strategies
The benchmark suite measures:
- Wall-clock time (using `time.perf_counter`)
- Peak memory usage (using `tracemalloc` and `psutil`)
- Comparison and swap counts (when instrumentation is enabled)
- Correctness verification (comparing against Python's `sorted()`)
## Project Structure
```
.
├── src/
│ ├── algorithms/
│ │ ├── merge_sort.py # Merge sort implementation
│ │ └── quick_sort.py # Quick sort with pivot strategies
│ └── bench/
│ ├── benchmark.py # Main CLI benchmark runner
│ ├── datasets.py # Dataset generators
│ ├── metrics.py # Performance measurement utilities
│ └── logging_setup.py # Logging configuration
├── tests/
│ └── test_sorts.py # Comprehensive test suite
├── scripts/
│ └── run_benchmarks.sh # Convenience script to run benchmarks
├── results/ # Auto-created: CSV, JSON, logs
├── plots/ # Auto-created: PNG visualizations
├── pyproject.toml # Project configuration and dependencies
├── .gitignore
└── README.md # This file
```
## Installation
### Prerequisites
- Python 3.8 or higher
- pip (Python package manager)
### Setup
1. Clone the repository:
```bash
git clone <repository-url>
cd divide-and-conquer-analysis
```
2. Install dependencies:
```bash
pip install -e .
```
Or install from `pyproject.toml`:
```bash
pip install -e ".[dev]" # Includes dev dependencies (mypy, ruff, black)
```
## Quick Start
### Run a Simple Benchmark
```bash
python -m src.bench.benchmark \
--algorithms merge,quick \
--datasets sorted,reverse,random \
--sizes 1000,5000,10000 \
--runs 5 \
--seed 42 \
--instrument \
--make-plots
```
### Use the Convenience Script
```bash
./scripts/run_benchmarks.sh
```
## CLI Usage
The benchmark CLI (`src.bench.benchmark`) supports the following arguments:
### Required Arguments
None (all have defaults)
### Optional Arguments
- `--algorithms`: Comma-separated list of algorithms to benchmark
- Options: `merge`, `quick`
- Default: `merge,quick`
- `--pivot`: Pivot strategy for Quick Sort
- Options: `first`, `last`, `median_of_three`, `random`
- Default: `random`
- `--datasets`: Comma-separated list of dataset types
- Options: `sorted`, `reverse`, `random`, `nearly_sorted`, `duplicates_heavy`
- Default: `sorted,reverse,random,nearly_sorted,duplicates_heavy`
- `--sizes`: Comma-separated list of dataset sizes
- Default: `1000,5000,10000,50000`
- Example: `--sizes 1000,5000,10000,50000,100000`
- `--runs`: Number of runs per experiment (for statistical significance)
- Default: `5`
- `--seed`: Random seed for reproducibility
- Default: `42`
- `--outdir`: Output directory for results
- Default: `results`
- `--log-level`: Logging level
- Options: `DEBUG`, `INFO`, `WARNING`, `ERROR`
- Default: `INFO`
- `--instrument`: Enable counting of comparisons and swaps
- Flag (no value)
- `--make-plots`: Generate plots after benchmarking
- Flag (no value)
### Example CLI Commands
**Basic benchmark with default settings:**
```bash
python -m src.bench.benchmark
```
**Full benchmark with all options:**
```bash
python -m src.bench.benchmark \
--algorithms merge,quick \
--pivot random \
--datasets sorted,reverse,random,nearly_sorted,duplicates_heavy \
--sizes 1000,5000,10000,50000 \
--runs 5 \
--seed 42 \
--instrument \
--outdir results \
--log-level INFO \
--make-plots
```
**Compare pivot strategies:**
```bash
for pivot in first last median_of_three random; do
python -m src.bench.benchmark \
--algorithms quick \
--pivot $pivot \
--datasets random \
--sizes 10000,50000 \
--runs 10 \
--seed 42
done
```
**Quick performance check:**
```bash
python -m src.bench.benchmark \
--algorithms merge,quick \
--datasets random \
--sizes 10000 \
--runs 3 \
--make-plots
```
## Output Files
### Results Directory (`results/`)
- **`bench_results.csv`**: Detailed results in CSV format
- Columns: `algorithm`, `pivot`, `dataset`, `size`, `run`, `time_s`, `peak_mem_bytes`, `comparisons`, `swaps`, `seed`
- One row per run
- **`summary.json`**: Aggregated statistics per (algorithm, dataset, size) combination
- Includes: mean, std, best, worst times and memory
- Comparison and swap statistics (if instrumentation enabled)
- **`bench.log`**: Rotating log file (max 10MB, 5 backups)
- Contains: system info, run metadata, progress logs, errors
### Plots Directory (`plots/`)
- **`time_vs_size.png`**: Line chart showing sorting time vs array size
- Separate subplot for each dataset type
- One line per algorithm
- **`memory_vs_size.png`**: Line chart showing memory usage vs array size
- Separate subplot for each dataset type
- One line per algorithm
## Reproducing Results
### Generate a Plot
After running benchmarks:
```bash
python -m src.bench.benchmark \
--algorithms merge,quick \
--datasets random \
--sizes 1000,5000,10000,50000 \
--runs 5 \
--seed 42 \
--make-plots
```
Plots will be automatically generated in `plots/` directory.
### Generate CSV from Scratch
```bash
python -m src.bench.benchmark \
--algorithms merge \
--datasets sorted,reverse,random \
--sizes 1000,5000 \
--runs 3 \
--seed 42 \
--outdir results
```
Check `results/bench_results.csv` for the output.
## Logging
Logging is configured via `src.bench.logging_setup.py`:
- **Console output**: Formatted with timestamp, level, and message
- **File output**: Detailed logs with function names and line numbers
- **Rotation**: Log files rotate at 10MB, keeping 5 backups
- **Metadata**: Logs include Python version, OS, architecture, and git commit (if available)
### Log Levels
- `DEBUG`: Detailed diagnostic information
- `INFO`: General informational messages (default)
- `WARNING`: Warning messages
- `ERROR`: Error messages
### Example Log Output
```
2024-01-15 10:30:00 - __main__ - INFO - ================================================================================
2024-01-15 10:30:00 - __main__ - INFO - Benchmark session started
2024-01-15 10:30:00 - __main__ - INFO - Python version: 3.10.5
2024-01-15 10:30:00 - __main__ - INFO - Platform: macOS-13.0
2024-01-15 10:30:00 - __main__ - INFO - Running merge on random size=1000 run=1/5
```
## Testing
Run the test suite:
```bash
pytest tests/ -v
```
Run with coverage:
```bash
pytest tests/ --cov=src --cov-report=html
```
### Test Coverage
The test suite includes:
1. **Unit Tests**:
- Empty arrays
- Single element arrays
- Already sorted arrays
- Reverse sorted arrays
- Random arrays
- Arrays with duplicates
- Large arrays
- Instrumentation tests
2. **Property Tests**:
- Comparison with Python's `sorted()` on random arrays
- Multiple sizes and pivot strategies
## Code Quality
### Type Checking
```bash
mypy src/ tests/
```
### Linting
```bash
ruff check src/ tests/
```
### Formatting
```bash
ruff format src/ tests/
```
Or using black:
```bash
black src/ tests/
```
## Algorithm Details
### Merge Sort
- **Time Complexity**: O(n log n) worst-case, average-case, best-case
- **Space Complexity**: O(n)
- **Stability**: Stable
- **Implementation**: Recursive divide-and-conquer with merging
### Quick Sort
- **Time Complexity**: O(n log n) average-case, O(n²) worst-case
- **Space Complexity**: O(log n) average-case (recursion stack)
- **Stability**: Not stable (in-place implementation)
- **Pivot Strategies**:
- `first`: Always use first element (O(n²) on sorted arrays)
- `last`: Always use last element (O(n²) on reverse sorted arrays)
- `median_of_three`: Use median of first, middle, last
- `random`: Random pivot (good average performance)
## Dataset Types
1. **sorted**: Array already in ascending order `[0, 1, 2, ..., n-1]`
2. **reverse**: Array in descending order `[n-1, n-2, ..., 0]`
3. **random**: Random integers from `[0, 10*n)` range
4. **nearly_sorted**: Sorted array with ~1% of elements swapped
5. **duplicates_heavy**: Array with many duplicate values (only `n/10` distinct values)
## Performance Considerations
- Benchmarks use `time.perf_counter()` for high-resolution timing
- Memory measurement uses both `tracemalloc` and `psutil` for accuracy
- Multiple runs per experiment reduce variance
- Seeded randomness ensures reproducibility
## Contributing
1. Follow Python type hints (checked with mypy)
2. Maintain test coverage
3. Run linting before committing
4. Update README for significant changes
## License
[Specify your license here]
## Acknowledgments
- Algorithms based on standard divide-and-conquer implementations
- Benchmarking framework inspired by best practices in performance testing

49
pyproject.toml Normal file
View File

@@ -0,0 +1,49 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "algorithms-week2"
version = "0.1.0"
description = "Divide-and-conquer sorting algorithms benchmark"
requires-python = ">=3.8"
dependencies = [
"numpy>=1.21.0",
"psutil>=5.8.0",
"matplotlib>=3.5.0",
"pytest>=7.0.0",
"pytest-cov>=3.0.0",
]
[project.optional-dependencies]
dev = [
"mypy>=0.950",
"ruff>=0.0.200",
"black>=22.0.0",
]
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false
check_untyped_defs = true
[tool.ruff]
line-length = 100
target-version = "py38"
[tool.ruff.lint]
select = ["E", "F", "W", "I"]
ignore = ["E501"]
[tool.black]
line-length = 100
target-version = ["py38"]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = "test_*.py"
python_classes = "Test*"
python_functions = "test_*"

20
scripts/run_benchmarks.sh Executable file
View File

@@ -0,0 +1,20 @@
#!/bin/bash
# Run benchmarks script
set -e
echo "Running sorting algorithm benchmarks..."
python -m src.bench.benchmark \
--algorithms merge,quick \
--datasets sorted,reverse,random,nearly_sorted,duplicates_heavy \
--sizes 1000,5000,10000,50000 \
--runs 5 \
--seed 42 \
--instrument \
--outdir results \
--log-level INFO \
--make-plots
echo "Benchmarks completed. Check results/ and plots/ directories."

2
src/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Algorithms Week 2: Divide-and-Conquer Sorting Benchmarks."""

View File

@@ -0,0 +1,2 @@
"""Sorting algorithm implementations."""

View File

@@ -0,0 +1,53 @@
"""Merge Sort implementation with instrumentation support."""
from typing import List, Optional, Callable
def merge_sort(
arr: List[int],
instrument: Optional[Callable[[str], None]] = None,
) -> List[int]:
"""
Sort array using merge sort algorithm.
Args:
arr: List of integers to sort
instrument: Optional callback function for counting operations.
Called with 'comparison' or 'swap' strings.
Returns:
Sorted copy of the input array.
"""
if len(arr) <= 1:
return arr[:]
def _merge(left: List[int], right: List[int]) -> List[int]:
"""Merge two sorted arrays."""
result: List[int] = []
i, j = 0, 0
while i < len(left) and j < len(right):
if instrument:
instrument("comparison")
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
def _merge_sort_recursive(arr_inner: List[int]) -> List[int]:
"""Recursive merge sort helper."""
if len(arr_inner) <= 1:
return arr_inner[:]
mid = len(arr_inner) // 2
left = _merge_sort_recursive(arr_inner[:mid])
right = _merge_sort_recursive(arr_inner[mid:])
return _merge(left, right)
return _merge_sort_recursive(arr)

View File

@@ -0,0 +1,97 @@
"""Quick Sort implementation with pivot strategies and instrumentation support."""
from typing import List, Optional, Callable, Literal
import random
PivotStrategy = Literal["first", "last", "median_of_three", "random"]
def quick_sort(
arr: List[int],
pivot_strategy: PivotStrategy = "first",
instrument: Optional[Callable[[str], None]] = None,
seed: Optional[int] = None,
) -> List[int]:
"""
Sort array using quick sort algorithm.
Args:
arr: List of integers to sort
pivot_strategy: Strategy for selecting pivot ('first', 'last',
'median_of_three', 'random')
instrument: Optional callback function for counting operations.
Called with 'comparison' or 'swap' strings.
seed: Optional random seed for 'random' pivot strategy
Returns:
Sorted copy of the input array.
"""
if len(arr) <= 1:
return arr[:]
arr_copy = arr[:]
def _choose_pivot(left: int, right: int) -> int:
"""Choose pivot index based on strategy."""
if pivot_strategy == "first":
return left
elif pivot_strategy == "last":
return right
elif pivot_strategy == "median_of_three":
mid = (left + right) // 2
if instrument:
instrument("comparison")
instrument("comparison")
if arr_copy[left] <= arr_copy[mid] <= arr_copy[right] or \
arr_copy[right] <= arr_copy[mid] <= arr_copy[left]:
return mid
elif arr_copy[mid] <= arr_copy[left] <= arr_copy[right] or \
arr_copy[right] <= arr_copy[left] <= arr_copy[mid]:
return left
else:
return right
elif pivot_strategy == "random":
return random.randint(left, right)
else:
raise ValueError(f"Unknown pivot strategy: {pivot_strategy}")
def _partition(left: int, right: int, pivot_idx: int) -> int:
"""Partition array around pivot and return final pivot position."""
pivot_val = arr_copy[pivot_idx]
# Move pivot to end
arr_copy[pivot_idx], arr_copy[right] = arr_copy[right], arr_copy[pivot_idx]
if instrument:
instrument("swap")
store_idx = left
for i in range(left, right):
if instrument:
instrument("comparison")
if arr_copy[i] <= pivot_val:
if i != store_idx:
arr_copy[i], arr_copy[store_idx] = arr_copy[store_idx], arr_copy[i]
if instrument:
instrument("swap")
store_idx += 1
# Move pivot to final position
arr_copy[store_idx], arr_copy[right] = arr_copy[right], arr_copy[store_idx]
if instrument:
instrument("swap")
return store_idx
def _quick_sort_recursive(left: int, right: int) -> None:
"""Recursive quick sort helper."""
if left < right:
pivot_idx = _choose_pivot(left, right)
final_pivot = _partition(left, right, pivot_idx)
_quick_sort_recursive(left, final_pivot - 1)
_quick_sort_recursive(final_pivot + 1, right)
if seed is not None:
random.seed(seed)
_quick_sort_recursive(0, len(arr_copy) - 1)
return arr_copy

2
src/bench/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Benchmarking utilities."""

433
src/bench/benchmark.py Normal file
View File

@@ -0,0 +1,433 @@
"""Benchmark CLI for sorting algorithms."""
import argparse
import csv
import json
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
import random
from src.algorithms.merge_sort import merge_sort
from src.algorithms.quick_sort import quick_sort, PivotStrategy
from src.bench.datasets import generate_dataset, DatasetType
from src.bench.metrics import measure_sort_performance, Metrics, aggregate_metrics
from src.bench.logging_setup import setup_logging, get_logger
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Benchmark divide-and-conquer sorting algorithms",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--algorithms",
type=str,
default="merge,quick",
help="Comma-separated list of algorithms (merge, quick)",
)
parser.add_argument(
"--pivot",
type=str,
default="random",
choices=["first", "last", "median_of_three", "random"],
help="Pivot strategy for Quick Sort",
)
parser.add_argument(
"--datasets",
type=str,
default="sorted,reverse,random,nearly_sorted,duplicates_heavy",
help="Comma-separated list of dataset types",
)
parser.add_argument(
"--sizes",
type=str,
default="1000,5000,10000,50000",
help="Comma-separated list of dataset sizes",
)
parser.add_argument(
"--runs",
type=int,
default=5,
help="Number of runs per experiment",
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="Random seed for reproducibility",
)
parser.add_argument(
"--outdir",
type=str,
default="results",
help="Output directory for results",
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level",
)
parser.add_argument(
"--instrument",
action="store_true",
help="Count comparisons and swaps",
)
parser.add_argument(
"--make-plots",
action="store_true",
help="Generate plots after benchmarking",
)
return parser.parse_args()
def run_benchmark(
algorithm: str,
pivot_strategy: Optional[str],
dataset_type: DatasetType,
size: int,
runs: int,
seed: int,
instrument: bool,
logger: Any,
) -> List[Dict[str, Any]]:
"""
Run benchmark for a single algorithm/dataset/size combination.
Returns:
List of result dictionaries, one per run
"""
results: List[Dict[str, Any]] = []
# Get sort function
if algorithm == "merge":
sort_func = merge_sort
sort_kwargs: Dict[str, Any] = {}
elif algorithm == "quick":
sort_func = quick_sort
sort_kwargs = {
"pivot_strategy": pivot_strategy or "first",
}
# Only pass seed for random pivot strategy
if pivot_strategy == "random":
sort_kwargs["seed"] = seed
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
for run_idx in range(runs):
logger.info(
f"Running {algorithm} on {dataset_type} size={size} run={run_idx+1}/{runs}"
)
# Generate dataset with unique seed per run
dataset_seed = seed + run_idx * 1000 if seed is not None else None
arr = generate_dataset(size, dataset_type, seed=dataset_seed)
# For quick sort with random pivot, use unique seed per run
if algorithm == "quick" and pivot_strategy == "random":
sort_kwargs["seed"] = (seed + run_idx * 1000) if seed is not None else None
# Run benchmark
sorted_arr, metrics = measure_sort_performance(
sort_func,
arr,
instrument=instrument,
**sort_kwargs,
)
# Verify correctness
expected = sorted(arr)
if sorted_arr != expected:
logger.error(
f"Correctness check failed for {algorithm} on {dataset_type} "
f"size={size} run={run_idx+1}"
)
logger.error(f"Expected: {expected[:10]}...")
logger.error(f"Got: {sorted_arr[:10]}...")
return [] # Return empty to indicate failure
# Store result
result = {
"algorithm": algorithm,
"pivot": pivot_strategy if algorithm == "quick" else None,
"dataset": dataset_type,
"size": size,
"run": run_idx + 1,
"time_s": metrics.time_seconds,
"peak_mem_bytes": metrics.peak_memory_bytes,
"comparisons": metrics.comparisons if instrument else None,
"swaps": metrics.swaps if instrument else None,
"seed": seed,
}
results.append(result)
return results
def save_results_csv(results: List[Dict[str, Any]], csv_path: Path) -> None:
"""Save results to CSV file."""
if not results:
return
file_exists = csv_path.exists()
with open(csv_path, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(results)
def save_summary_json(results: List[Dict[str, Any]], json_path: Path) -> None:
"""Save aggregated summary to JSON file."""
if not results:
return
# Group by (algorithm, pivot, dataset, size)
grouped: Dict[tuple, List[Metrics]] = {}
for result in results:
key = (
result["algorithm"],
result.get("pivot"),
result["dataset"],
result["size"],
)
metrics = Metrics()
metrics.time_seconds = result["time_s"]
metrics.peak_memory_bytes = result["peak_mem_bytes"]
metrics.comparisons = result.get("comparisons") or 0
metrics.swaps = result.get("swaps") or 0
if key not in grouped:
grouped[key] = []
grouped[key].append(metrics)
# Aggregate
summary: Dict[str, Any] = {}
for key, metrics_list in grouped.items():
algo, pivot, dataset, size = key
key_str = f"{algo}_{pivot or 'N/A'}_{dataset}_{size}"
summary[key_str] = aggregate_metrics(metrics_list)
summary[key_str]["algorithm"] = algo
summary[key_str]["pivot"] = pivot
summary[key_str]["dataset"] = dataset
summary[key_str]["size"] = size
# Merge with existing summary if it exists
if json_path.exists():
with open(json_path, "r") as f:
existing = json.load(f)
existing.update(summary)
summary = existing
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
def generate_plots(results: List[Dict[str, Any]], plots_dir: Path, logger: Any) -> None:
"""Generate plots from results."""
try:
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
except ImportError:
logger.warning("matplotlib not available, skipping plots")
return
plots_dir.mkdir(parents=True, exist_ok=True)
if not results:
logger.warning("No results to plot")
return
# Group results by algorithm and dataset
algorithms = sorted(set(r["algorithm"] for r in results))
datasets = sorted(set(r["dataset"] for r in results))
sizes = sorted(set(r["size"] for r in results))
# Time vs size plots
fig, axes = plt.subplots(len(datasets), 1, figsize=(10, 5 * len(datasets)))
if len(datasets) == 1:
axes = [axes]
for idx, dataset in enumerate(datasets):
ax = axes[idx]
for algo in algorithms:
algo_results = [
r for r in results
if r["algorithm"] == algo and r["dataset"] == dataset
]
if not algo_results:
continue
# Average time per size
size_times: Dict[int, List[float]] = {}
for r in algo_results:
size = r["size"]
if size not in size_times:
size_times[size] = []
size_times[size].append(r["time_s"])
avg_times = [sum(size_times[s]) / len(size_times[s]) for s in sizes if s in size_times]
plot_sizes = [s for s in sizes if s in size_times]
ax.plot(plot_sizes, avg_times, marker="o", label=algo)
ax.set_xlabel("Array Size")
ax.set_ylabel("Time (seconds)")
ax.set_title(f"Sorting Time vs Size - {dataset}")
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / "time_vs_size.png", dpi=150)
plt.close()
# Memory vs size plots
fig, axes = plt.subplots(len(datasets), 1, figsize=(10, 5 * len(datasets)))
if len(datasets) == 1:
axes = [axes]
for idx, dataset in enumerate(datasets):
ax = axes[idx]
for algo in algorithms:
algo_results = [
r for r in results
if r["algorithm"] == algo and r["dataset"] == dataset
]
if not algo_results:
continue
# Average memory per size
size_memories: Dict[int, List[int]] = {}
for r in algo_results:
size = r["size"]
if size not in size_memories:
size_memories[size] = []
size_memories[size].append(r["peak_mem_bytes"])
avg_memories = [
sum(size_memories[s]) / len(size_memories[s])
for s in sizes if s in size_memories
]
plot_sizes = [s for s in sizes if s in size_memories]
ax.plot(plot_sizes, avg_memories, marker="o", label=algo)
ax.set_xlabel("Array Size")
ax.set_ylabel("Peak Memory (bytes)")
ax.set_title(f"Memory Usage vs Size - {dataset}")
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / "memory_vs_size.png", dpi=150)
plt.close()
logger.info(f"Plots saved to {plots_dir}")
def main() -> int:
"""Main entry point."""
args = parse_args()
# Setup paths
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
plots_dir = Path("plots")
# Setup logging
setup_logging(outdir, args.log_level)
logger = get_logger(__name__)
# Parse arguments
algorithms = [a.strip() for a in args.algorithms.split(",")]
datasets = [d.strip() for d in args.datasets.split(",")]
sizes = [int(s.strip()) for s in args.sizes.split(",")]
# Validate algorithms
valid_algorithms = {"merge", "quick"}
for algo in algorithms:
if algo not in valid_algorithms:
logger.error(f"Invalid algorithm: {algo}")
return 1
# Set random seed
if args.seed is not None:
random.seed(args.seed)
# Run benchmarks
all_results: List[Dict[str, Any]] = []
correctness_failed = False
for algorithm in algorithms:
pivot_strategy = args.pivot if algorithm == "quick" else None
for dataset_type in datasets:
for size in sizes:
try:
results = run_benchmark(
algorithm,
pivot_strategy,
dataset_type, # type: ignore
size,
args.runs,
args.seed,
args.instrument,
logger,
)
if not results:
correctness_failed = True
else:
all_results.extend(results)
except Exception as e:
logger.error(
f"Error running benchmark: {algorithm}, {dataset_type}, {size}",
exc_info=True,
)
correctness_failed = True
# Save results
csv_path = outdir / "bench_results.csv"
json_path = outdir / "summary.json"
if all_results:
save_results_csv(all_results, csv_path)
save_summary_json(all_results, json_path)
logger.info(f"Results saved to {csv_path} and {json_path}")
# Generate plots
if args.make_plots or all_results:
generate_plots(all_results, plots_dir, logger)
# Exit with error if correctness failed
if correctness_failed:
logger.error("Benchmark failed due to correctness check failures")
return 1
logger.info("Benchmark completed successfully")
return 0
if __name__ == "__main__":
sys.exit(main())

54
src/bench/datasets.py Normal file
View File

@@ -0,0 +1,54 @@
"""Dataset generators for benchmarking."""
from typing import List, Literal, Optional
import random
DatasetType = Literal["sorted", "reverse", "random", "nearly_sorted", "duplicates_heavy"]
def generate_dataset(
size: int,
dataset_type: DatasetType,
seed: Optional[int] = None,
) -> List[int]:
"""
Generate a dataset of specified type and size.
Args:
size: Number of elements in the dataset
dataset_type: Type of dataset to generate
seed: Random seed for reproducibility
Returns:
List of integers with the specified characteristics
"""
if seed is not None:
random.seed(seed)
if dataset_type == "sorted":
return list(range(size))
elif dataset_type == "reverse":
return list(range(size - 1, -1, -1))
elif dataset_type == "random":
return [random.randint(0, size * 10) for _ in range(size)]
elif dataset_type == "nearly_sorted":
arr = list(range(size))
# Perform a few swaps (about 1% of elements)
num_swaps = max(1, size // 100)
for _ in range(num_swaps):
i = random.randint(0, size - 1)
j = random.randint(0, size - 1)
arr[i], arr[j] = arr[j], arr[i]
return arr
elif dataset_type == "duplicates_heavy":
# Generate array with many duplicate values
# Use only a small set of distinct values
distinct_values = max(1, size // 10)
return [random.randint(0, distinct_values - 1) for _ in range(size)]
else:
raise ValueError(f"Unknown dataset type: {dataset_type}")

View File

@@ -0,0 +1,96 @@
"""Logging configuration for benchmarks."""
import logging
import sys
from pathlib import Path
from logging.handlers import RotatingFileHandler
from typing import Optional
import platform
def setup_logging(
log_dir: Path,
log_level: str = "INFO",
log_file: str = "bench.log",
) -> None:
"""
Configure logging to both console and rotating file.
Args:
log_dir: Directory to write log files
log_level: Logging level (DEBUG, INFO, WARNING, ERROR)
log_file: Name of the log file
"""
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / log_file
# Convert string level to logging constant
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
# Create logger
logger = logging.getLogger()
logger.setLevel(numeric_level)
# Remove existing handlers to avoid duplicates
logger.handlers.clear()
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(numeric_level)
console_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)
# File handler with rotation (10MB max, keep 5 backups)
file_handler = RotatingFileHandler(
log_path,
maxBytes=10 * 1024 * 1024,
backupCount=5,
encoding='utf-8',
)
file_handler.setLevel(numeric_level)
file_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
# Log system information
logger.info("=" * 80)
logger.info("Benchmark session started")
logger.info(f"Python version: {sys.version}")
logger.info(f"Platform: {platform.platform()}")
logger.info(f"Architecture: {platform.machine()}")
# Try to get git commit if available
try:
import subprocess
result = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0:
logger.info(f"Git commit: {result.stdout.strip()}")
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
pass
logger.info("=" * 80)
def get_logger(name: Optional[str] = None) -> logging.Logger:
"""
Get a logger instance.
Args:
name: Logger name (defaults to calling module)
Returns:
Logger instance
"""
return logging.getLogger(name)

125
src/bench/metrics.py Normal file
View File

@@ -0,0 +1,125 @@
"""Performance metrics collection."""
import time
import tracemalloc
from typing import Dict, Any, Optional, List
import psutil
import os
class Metrics:
"""Container for benchmark metrics."""
def __init__(self) -> None:
self.time_seconds: float = 0.0
self.peak_memory_bytes: int = 0
self.comparisons: int = 0
self.swaps: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary."""
return {
"time_s": self.time_seconds,
"peak_mem_bytes": self.peak_memory_bytes,
"comparisons": self.comparisons,
"swaps": self.swaps,
}
def measure_sort_performance(
sort_func,
arr: List[int],
*args,
instrument: bool = False,
**kwargs,
) -> tuple[List[int], Metrics]:
"""
Measure performance of a sorting function.
Args:
sort_func: Sorting function to benchmark
arr: Input array to sort
*args: Additional positional arguments for sort_func
instrument: Whether to count comparisons and swaps
**kwargs: Additional keyword arguments for sort_func
Returns:
Tuple of (sorted_array, metrics)
"""
metrics = Metrics()
# Setup instrumentation
if instrument:
counters: Dict[str, int] = {"comparison": 0, "swap": 0}
def instrument_callback(op: str) -> None:
if op in counters:
counters[op] += 1
if "instrument" not in kwargs:
kwargs["instrument"] = instrument_callback
# Measure memory before
process = psutil.Process(os.getpid())
tracemalloc.start()
# Measure time
start_time = time.perf_counter()
sorted_arr = sort_func(arr, *args, **kwargs)
end_time = time.perf_counter()
# Measure memory
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
rss_memory = process.memory_info().rss
metrics.time_seconds = end_time - start_time
metrics.peak_memory_bytes = max(peak, rss_memory)
if instrument:
metrics.comparisons = counters.get("comparison", 0)
metrics.swaps = counters.get("swap", 0)
return sorted_arr, metrics
def aggregate_metrics(metrics_list: List[Metrics]) -> Dict[str, Any]:
"""
Aggregate metrics across multiple runs.
Args:
metrics_list: List of Metrics objects from multiple runs
Returns:
Dictionary with aggregated statistics
"""
if not metrics_list:
return {}
times = [m.time_seconds for m in metrics_list]
memories = [m.peak_memory_bytes for m in metrics_list]
comparisons = [m.comparisons for m in metrics_list if m.comparisons > 0]
swaps = [m.swaps for m in metrics_list if m.swaps > 0]
import statistics
result: Dict[str, Any] = {
"time_mean_s": statistics.mean(times),
"time_std_s": statistics.stdev(times) if len(times) > 1 else 0.0,
"time_best_s": min(times),
"time_worst_s": max(times),
"memory_mean_bytes": statistics.mean(memories),
"memory_std_bytes": statistics.stdev(memories) if len(memories) > 1 else 0.0,
"memory_peak_bytes": max(memories),
"runs": len(metrics_list),
}
if comparisons:
result["comparisons_mean"] = statistics.mean(comparisons)
result["comparisons_std"] = statistics.stdev(comparisons) if len(comparisons) > 1 else 0.0
if swaps:
result["swaps_mean"] = statistics.mean(swaps)
result["swaps_std"] = statistics.stdev(swaps) if len(swaps) > 1 else 0.0
return result

2
tests/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Tests for sorting algorithms."""

161
tests/test_sorts.py Normal file
View File

@@ -0,0 +1,161 @@
"""Tests for sorting algorithms."""
import pytest
from typing import List
import random
from src.algorithms.merge_sort import merge_sort
from src.algorithms.quick_sort import quick_sort, PivotStrategy
class TestMergeSort:
"""Tests for merge sort algorithm."""
def test_empty_array(self) -> None:
"""Test sorting empty array."""
assert merge_sort([]) == []
def test_single_element(self) -> None:
"""Test sorting array with single element."""
assert merge_sort([42]) == [42]
def test_already_sorted(self) -> None:
"""Test sorting already sorted array."""
arr = [1, 2, 3, 4, 5]
assert merge_sort(arr) == [1, 2, 3, 4, 5]
# Original should not be modified
assert arr == [1, 2, 3, 4, 5]
def test_reverse_sorted(self) -> None:
"""Test sorting reverse sorted array."""
arr = [5, 4, 3, 2, 1]
assert merge_sort(arr) == [1, 2, 3, 4, 5]
def test_random_array(self) -> None:
"""Test sorting random array."""
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5]
assert merge_sort(arr) == [1, 1, 2, 3, 4, 5, 5, 6, 9]
def test_duplicates(self) -> None:
"""Test sorting array with duplicates."""
arr = [5, 5, 5, 3, 3, 1]
assert merge_sort(arr) == [1, 3, 3, 5, 5, 5]
def test_large_array(self) -> None:
"""Test sorting large array."""
arr = list(range(1000, 0, -1))
result = merge_sort(arr)
assert result == list(range(1, 1001))
def test_instrumentation(self) -> None:
"""Test instrumentation callback."""
counters: dict = {"comparison": 0, "swap": 0}
def instrument(op: str) -> None:
if op in counters:
counters[op] += 1
arr = [3, 1, 4, 1, 5]
result = merge_sort(arr, instrument=instrument)
assert result == [1, 1, 3, 4, 5]
assert counters["comparison"] > 0
# Merge sort doesn't do swaps in traditional sense
assert counters["swap"] == 0
class TestQuickSort:
"""Tests for quick sort algorithm."""
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_empty_array(self, pivot: PivotStrategy) -> None:
"""Test sorting empty array."""
assert quick_sort([], pivot_strategy=pivot) == []
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_single_element(self, pivot: PivotStrategy) -> None:
"""Test sorting array with single element."""
assert quick_sort([42], pivot_strategy=pivot) == [42]
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_already_sorted(self, pivot: PivotStrategy) -> None:
"""Test sorting already sorted array."""
arr = [1, 2, 3, 4, 5]
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
assert result == [1, 2, 3, 4, 5]
# Original should not be modified
assert arr == [1, 2, 3, 4, 5]
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_reverse_sorted(self, pivot: PivotStrategy) -> None:
"""Test sorting reverse sorted array."""
arr = [5, 4, 3, 2, 1]
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
assert result == [1, 2, 3, 4, 5]
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_random_array(self, pivot: PivotStrategy) -> None:
"""Test sorting random array."""
arr = [3, 1, 4, 1, 5, 9, 2, 6, 5]
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
assert result == [1, 1, 2, 3, 4, 5, 5, 6, 9]
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_duplicates(self, pivot: PivotStrategy) -> None:
"""Test sorting array with duplicates."""
arr = [5, 5, 5, 3, 3, 1]
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
assert result == [1, 3, 3, 5, 5, 5]
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
def test_large_array(self, pivot: PivotStrategy) -> None:
"""Test sorting large array."""
arr = list(range(1000, 0, -1))
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
assert result == list(range(1, 1001))
def test_instrumentation(self) -> None:
"""Test instrumentation callback."""
counters: dict = {"comparison": 0, "swap": 0}
def instrument(op: str) -> None:
if op in counters:
counters[op] += 1
arr = [3, 1, 4, 1, 5]
result = quick_sort(arr, pivot_strategy="first", instrument=instrument, seed=42)
assert result == [1, 1, 3, 4, 5]
assert counters["comparison"] > 0
assert counters["swap"] > 0
class TestPropertyTests:
"""Property-based tests comparing to Python's sorted()."""
@pytest.mark.parametrize("size", [10, 100, 1000])
def test_merge_sort_property(self, size: int) -> None:
"""Property test: merge_sort should match sorted() for random arrays."""
random.seed(42)
arr = [random.randint(-1000, 1000) for _ in range(size)]
result = merge_sort(arr)
expected = sorted(arr)
assert result == expected
@pytest.mark.parametrize("pivot", ["first", "last", "median_of_three", "random"])
@pytest.mark.parametrize("size", [10, 100, 1000])
def test_quick_sort_property(self, pivot: PivotStrategy, size: int) -> None:
"""Property test: quick_sort should match sorted() for random arrays."""
random.seed(42)
arr = [random.randint(-1000, 1000) for _ in range(size)]
result = quick_sort(arr, pivot_strategy=pivot, seed=42)
expected = sorted(arr)
assert result == expected
if __name__ == "__main__":
pytest.main([__file__, "-v"])