commit c1b0fd3aaf625d4c7db238d1253dd6e9af09147a Author: Carlos Gutierrez Date: Sun Nov 9 21:54:13 2025 -0500 Initial commit: Heapsort and Priority Queue Implementation - Implemented complete Heapsort algorithm with max-heap operations - Implemented binary heap-based Priority Queue with all core operations - Created Task class for task scheduling applications - Implemented Task Scheduler simulation using priority queue - Added comprehensive test suite (70+ tests, all passing) - Implemented sorting algorithm comparison utilities (Heapsort vs Quicksort vs Merge Sort) - Added detailed README with comprehensive analysis and documentation - Included demonstration scripts for all features - Generated performance comparison plots - Modular, well-documented code following academic standards Author: Carlos Gutierrez Email: cgutierrez44833@ucumberlands.edu diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..49cc66d --- /dev/null +++ b/.gitignore @@ -0,0 +1,79 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual Environments +venv/ +env/ +ENV/ +env.bak/ +venv.bak/ +.venv + +# IDEs and Editors +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store +*.sublime-project +*.sublime-workspace + +# Jupyter Notebook +.ipynb_checkpoints +*.ipynb + +# pytest +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Cursor IDE +.cursor/ + +# Environment variables +.env +.env.local +.env.*.local + +# OS files +.DS_Store +Thumbs.db +*.log + +# Project specific +*.pyc +__pycache__/ + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c1b1f34 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Carlos Gutierrez + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..2a6b8ed --- /dev/null +++ b/README.md @@ -0,0 +1,568 @@ +# MSCS532 Assignment 4: Heapsort and Priority Queue Implementation + +**Author:** Carlos Gutierrez +**Email:** cgutierrez44833@ucumberlands.edu +**Course:** MSCS532 - Data Structures and Algorithms + +## Overview + +This project implements and analyzes two fundamental data structures and algorithms: + +1. **Heapsort Algorithm**: A complete implementation of the Heapsort sorting algorithm with detailed complexity analysis +2. **Priority Queue**: A binary heap-based priority queue implementation with support for task scheduling + +The project includes comprehensive testing, empirical performance comparisons with other sorting algorithms, and detailed documentation suitable for academic submission. + +## Project Structure + +``` +MSCS532_Assignment4/ +├── src/ +│ ├── __init__.py # Package initialization +│ ├── heapsort.py # Heapsort implementation +│ ├── priority_queue.py # Priority Queue implementation +│ ├── task.py # Task class definition +│ ├── scheduler.py # Task scheduler simulation +│ └── comparison.py # Sorting algorithm comparison utilities +├── tests/ +│ ├── __init__.py +│ ├── test_heapsort.py # Tests for heapsort +│ ├── test_priority_queue.py # Tests for priority queue +│ ├── test_task.py # Tests for task class +│ ├── test_scheduler.py # Tests for task scheduler +│ └── test_comparison.py # Tests for comparison utilities +├── examples/ +│ ├── heapsort_demo.py # Heapsort demonstration +│ ├── priority_queue_demo.py # Priority queue demonstration +│ ├── comparison_demo.py # Sorting comparison demonstration +│ ├── scheduler_simulation.py # Task scheduler simulation +│ └── generate_plots.py # Plot generation script +├── docs/ +│ ├── sorting_comparison.png # Performance comparison plots +│ ├── sorting_comparison_bar.png # Bar chart comparison +│ └── algorithm_distributions.png # Algorithm distribution plots +├── requirements.txt # Python dependencies +└── README.md # This file +``` + +## Features + +### Heapsort Implementation +- Complete max-heap implementation +- In-place and non-in-place sorting options +- Support for custom key functions +- Time complexity: O(n log n) in all cases +- Space complexity: O(1) for in-place sorting + +### Priority Queue Implementation +- Binary heap-based priority queue +- Support for both max-heap and min-heap configurations +- Core operations: + - `insert(task)`: O(log n) + - `extract_max()` / `extract_min()`: O(log n) + - `increase_key()` / `decrease_key()`: O(n) (can be optimized to O(log n)) + - `is_empty()`: O(1) + - `peek()`: O(1) +- Task class with priority, deadline, and timing information + +### Performance Comparison +- Empirical comparison of Heapsort, Quicksort, and Merge Sort +- Testing on different input distributions: + - Sorted arrays + - Reverse-sorted arrays + - Random arrays +- Performance analysis and visualization + +![Sorting Algorithm Comparison](docs/sorting_comparison.png) + +*Performance comparison across different input distributions* + +![Sorting Algorithm Bar Chart](docs/sorting_comparison_bar.png) + +*Bar chart comparison on random input data* + +![Algorithm Performance by Distribution](docs/algorithm_distributions.png) + +*Individual algorithm performance across different input types* + +## Installation + +### Prerequisites +- Python 3.7 or higher +- pip (Python package manager) + +### Setup + +1. Clone the repository: +```bash +git clone https://github.com/CarGDev/MSCS532_Assignment4 +cd MSCS532_Assignment4 +``` + +2. Install dependencies (if any): +```bash +pip install -r requirements.txt +``` + +Note: This project uses only Python standard library, so no external dependencies are required. + +## Usage + +### Running Tests + +Run all tests: +```bash +python -m pytest tests/ -v +``` + +Or using unittest: +```bash +python -m unittest discover tests -v +``` + +Run specific test modules: +```bash +python -m unittest tests.test_heapsort -v +python -m unittest tests.test_priority_queue -v +python -m unittest tests.test_task -v +python -m unittest tests.test_comparison -v +``` + +### Heapsort Example + +```python +from src.heapsort import heapsort + +# Sort an array +arr = [12, 11, 13, 5, 6, 7] +sorted_arr = heapsort(arr, inplace=False) +print(sorted_arr) # [5, 6, 7, 11, 12, 13] + +# In-place sorting +arr = [3, 1, 4, 1, 5, 9, 2, 6] +heapsort(arr, inplace=True) +print(arr) # [1, 1, 2, 3, 4, 5, 6, 9] +``` + +### Priority Queue Example + +```python +from src.priority_queue import PriorityQueue +from src.task import Task + +# Create a priority queue +pq = PriorityQueue(is_max_heap=True) + +# Insert tasks +pq.insert(Task("T1", priority=10, arrival_time=0.0)) +pq.insert(Task("T2", priority=5, arrival_time=1.0)) +pq.insert(Task("T3", priority=15, arrival_time=2.0)) + +# Extract highest priority task +task = pq.extract_max() +print(task.task_id) # "T3" (highest priority) + +# Check queue status +print(pq.is_empty()) # False +print(pq.size()) # 2 +``` + +### Sorting Comparison Example + +```python +from src.comparison import run_comparison + +# Compare sorting algorithms on different input sizes +results = run_comparison(sizes=[100, 1000, 10000, 100000]) +``` + +### Running Demo Scripts + +```bash +# Heapsort demonstration +python examples/heapsort_demo.py + +# Priority queue demonstration +python examples/priority_queue_demo.py + +# Sorting algorithm comparison +python examples/comparison_demo.py + +# Task scheduler simulation +python examples/scheduler_simulation.py +``` + +## Time Complexity Analysis + +### Heapsort +- **Worst Case**: O(n log n) +- **Average Case**: O(n log n) +- **Best Case**: O(n log n) +- **Space Complexity**: O(1) for in-place, O(n) for non-in-place + +### Priority Queue Operations +- **insert()**: O(log n) +- **extract_max() / extract_min()**: O(log n) +- **increase_key() / decrease_key()**: O(n) - can be optimized to O(log n) with hash map +- **is_empty()**: O(1) +- **peek()**: O(1) +- **Space Complexity**: O(n) + +## Design Decisions + +### Data Structure Choice +The priority queue is implemented using a Python list to represent the binary heap. This choice provides: +- **Ease of Implementation**: Simple index calculations for parent/child relationships +- **Efficiency**: O(1) access to any element, O(log n) for heap operations +- **Memory Efficiency**: No overhead from node objects or pointers + +### Max-Heap vs Min-Heap +The implementation supports both configurations: +- **Max-Heap**: Higher priority values are extracted first (default) +- **Min-Heap**: Lower priority values are extracted first + +### Task Representation +The `Task` class uses a dataclass for clean, readable code with: +- Task identification (ID) +- Priority level +- Timing information (arrival time, deadline) +- Execution metadata + +## Testing + +The project includes comprehensive test coverage: +- **Unit Tests**: Individual function and method testing +- **Edge Cases**: Empty arrays, single elements, duplicates +- **Integration Tests**: Full workflow testing +- **Performance Tests**: Large-scale operation testing + +Run tests with: +```bash +python -m unittest discover tests -v +``` + +## Detailed Analysis and Report + +### Heapsort Implementation + +#### Algorithm Overview + +Heapsort is an in-place sorting algorithm that uses a binary heap data structure. The algorithm consists of two main phases: + +1. **Heap Construction**: Build a max-heap from the input array +2. **Extraction Phase**: Repeatedly extract the maximum element and place it at the end of the array + +#### Implementation Details + +The implementation includes the following key functions: + +- **`_heapify(arr, n, i, key)`**: Maintains the max-heap property for a subtree rooted at index `i`. Time Complexity: O(log n) +- **`_build_max_heap(arr, key)`**: Converts an unsorted array into a max-heap. Time Complexity: O(n) +- **`heapsort(arr, key, inplace)`**: The main sorting function. Time Complexity: O(n log n) + +#### Heapsort Time Complexity Analysis + +**Worst Case: O(n log n)** +- Heap construction: O(n) +- n extractions × O(log n) per extraction: O(n log n) +- **Total**: O(n) + O(n log n) = **O(n log n)** + +**Average Case: O(n log n)** +- The average case follows the same pattern as the worst case + +**Best Case: O(n log n)** +- Unlike some sorting algorithms, Heapsort does not have a best-case scenario that improves performance +- Even if the input is already sorted, the algorithm must still build the heap (O(n)) and extract all elements (O(n log n)) + +**Why O(n log n) in all cases?** + +Heapsort's time complexity is always O(n log n) because: +- The heap structure requires maintaining the heap property regardless of input order +- Each extraction requires O(log n) time to restore the heap property +- There are n extractions, resulting in O(n log n) total time + +**Space Complexity:** +- In-place version: O(1) - only uses a constant amount of extra space +- Non-in-place version: O(n) - creates a copy of the input array + +**Additional Overheads:** +1. Constant factors: Heapsort has higher constant factors than Quicksort +2. Cache performance: The heap structure has poor cache locality compared to array-based algorithms +3. Comparison overhead: More comparisons than Quicksort on average + +### Sorting Algorithm Comparison + +#### Algorithms Compared + +1. **Heapsort**: O(n log n) in all cases, in-place +2. **Quicksort**: O(n log n) average, O(n²) worst case, in-place +3. **Merge Sort**: O(n log n) in all cases, requires O(n) extra space + +#### Performance Characteristics + +1. **Heapsort**: + - Consistent performance across all input types + - Slightly slower than Quicksort on average due to constant factors + - More predictable than Quicksort (no worst-case degradation) + +2. **Quicksort**: + - Fastest on average for random inputs + - Degrades to O(n²) on sorted/reverse-sorted inputs (without optimizations) + - Excellent cache performance + +3. **Merge Sort**: + - Consistent O(n log n) performance + - Requires additional O(n) space + - Good for external sorting + +**When to Use Heapsort:** +- Guaranteed O(n log n) performance is required +- In-place sorting is necessary +- Worst-case performance must be predictable +- Memory constraints prevent using Merge Sort's O(n) space + +### Priority Queue Implementation Details + +#### Data Structure Choice + +The priority queue is implemented using a **binary heap** represented as a Python list. This choice is justified by: + +1. **Ease of Implementation**: Simple index calculations for parent/child relationships + - Parent of node at index `i`: `(i-1)//2` + - Left child of node at index `i`: `2*i+1` + - Right child of node at index `i`: `2*i+2` + +2. **Efficiency**: + - O(1) access to any element + - O(log n) for heap operations + - No pointer overhead + +3. **Memory Efficiency**: Compact representation without node objects + +#### Task Class Design + +The `Task` class represents individual tasks with: +- **task_id**: Unique identifier +- **priority**: Priority level (higher = more important) +- **arrival_time**: When the task enters the system +- **deadline**: Optional deadline for completion +- **execution_time**: Estimated execution duration +- **description**: Optional task description + +The class implements comparison operators based on priority, enabling natural use in priority queues. + +#### Core Operations Analysis + +**`insert(task)`: O(log n)** +- Add task to the end of the heap array +- Bubble up to maintain heap property +- Time Complexity: O(log n) - height of the heap + +**`extract_max()` / `extract_min()`: O(log n)** +- Remove root element +- Move last element to root +- Bubble down to maintain heap property +- Time Complexity: O(log n) - height of the heap + +**`increase_key(task, new_priority)`: O(n)** +- Find the task in the heap (O(n)) +- Update priority +- Bubble up if necessary (O(log n)) +- Time Complexity: O(n) - linear search dominates +- **Note**: Can be optimized to O(log n) using a hash map for O(1) lookup + +**`decrease_key(task, new_priority)`: O(n)** +- Similar to `increase_key`, but bubbles down instead of up +- Time Complexity: O(n) + +**`is_empty()`: O(1)** +- Simple check of heap size + +**`peek()`: O(1)** +- Returns the root element without removal + +### Task Scheduler Simulation + +#### Implementation + +A complete task scheduler simulation has been implemented using the priority queue. The scheduler demonstrates practical applications of priority queues in operating systems and job scheduling systems. + +#### Scheduler Design + +The `TaskScheduler` class implements a priority-based scheduling algorithm: + +1. **Task Insertion**: All tasks are inserted into a priority queue (O(n log n)) +2. **Task Execution**: Tasks are extracted and executed in priority order (O(n log n)) +3. **Deadline Monitoring**: Each task's deadline is checked upon completion +4. **Statistics Collection**: Comprehensive statistics are collected during scheduling + +#### Time Complexity Analysis + +- **schedule_tasks()**: O(n log n) where n is the number of tasks + - Inserting n tasks: O(n log n) + - Extracting n tasks: O(n log n) +- **get_statistics()**: O(n) to calculate statistics from results +- **Space Complexity**: O(n) for the priority queue + +#### Scheduling Results + +The scheduler simulation demonstrates: +- **Priority-based execution**: High-priority tasks execute first +- **Deadline tracking**: Tasks are monitored for deadline compliance +- **Wait time calculation**: Tracks how long tasks wait before execution +- **Performance metrics**: Throughput, average wait time, and deadline compliance rates + +The scheduler simulation shows that: +- Priority-based scheduling ensures critical tasks execute first +- Pure priority scheduling may miss deadlines for lower-priority tasks with tight deadlines +- The scheduler efficiently handles large workloads (50+ tasks) using O(n log n) algorithm +- Statistics provide valuable insights into scheduling performance + +### Design Decisions + +#### 1. List-Based Heap Representation + +**Decision**: Use Python list instead of node-based tree structure. + +**Rationale**: +- Simpler implementation +- Better memory locality +- Easier index calculations +- No pointer overhead + +**Trade-off**: Slightly less intuitive than tree structure, but more efficient. + +#### 2. Max-Heap vs Min-Heap Configuration + +**Decision**: Support both configurations via constructor parameter. + +**Rationale**: +- Flexibility for different use cases +- Single implementation for both +- Clear API distinction + +#### 3. Task Class Design + +**Decision**: Use dataclass with comparison operators. + +**Rationale**: +- Clean, readable code +- Natural integration with priority queue +- Easy to extend with additional fields + +#### 4. In-Place vs Non-In-Place Sorting + +**Decision**: Support both options in heapsort. + +**Rationale**: +- Flexibility for different use cases +- In-place for memory efficiency +- Non-in-place for preserving original data + +#### 5. Linear Search for Key Updates + +**Decision**: Use linear search instead of hash map for `increase_key`/`decrease_key`. + +**Rationale**: +- Simpler implementation +- No additional space overhead +- Acceptable for small to medium-sized queues +- Can be optimized later if needed + +### Experimental Results + +#### Test Configuration + +Tests were conducted on: +- **Input Sizes**: 100, 1,000, 10,000, 100,000 elements +- **Distributions**: Sorted, reverse-sorted, random +- **Algorithms**: Heapsort, Quicksort, Merge Sort + +#### Key Findings + +1. **Heapsort Performance**: + - Consistent O(n log n) behavior across all input types + - Approximately 1.5-2x slower than optimized Quicksort on random data + - More predictable than Quicksort (no worst-case degradation) + +2. **Priority Queue Performance**: + - Efficient insertion and extraction for large numbers of tasks + - Suitable for real-time scheduling applications + - Linear key updates acceptable for moderate queue sizes + +3. **Scalability**: + - Both implementations scale well with input size + - Performance matches theoretical predictions + +### Conclusion + +This project successfully implements and analyzes: + +1. **Heapsort Algorithm**: A robust, O(n log n) sorting algorithm suitable for scenarios requiring guaranteed performance +2. **Priority Queue**: An efficient data structure for task scheduling and priority-based processing +3. **Task Scheduler**: A complete simulation demonstrating practical applications + +#### Key Achievements + +- ✅ Complete, well-documented implementations +- ✅ Comprehensive complexity analysis +- ✅ Empirical performance comparisons +- ✅ Extensive test coverage (70+ tests) +- ✅ Modular, maintainable code structure +- ✅ Task scheduler simulation with statistics + +#### Future Improvements + +1. **Optimize Key Updates**: Implement hash map for O(log n) key updates +2. **Parallel Heapsort**: Explore parallel heap construction +3. **Adaptive Heapsort**: Optimize for partially sorted inputs +4. **Priority Queue Variants**: Implement binomial heap or Fibonacci heap + +## Results Summary + +### Heapsort Performance +- Consistent O(n log n) performance across all input types +- Slightly slower than Quicksort on average due to constant factors +- More predictable than Quicksort (no worst-case O(n²) scenario) +- Comparable to Merge Sort but with better space efficiency (in-place) + +The performance plots above demonstrate: +- **Heapsort**: Consistent performance regardless of input distribution +- **Quicksort**: Fastest on random data, but degrades significantly on sorted/reverse-sorted inputs +- **Merge Sort**: Consistent performance but requires O(n) extra space + +### Priority Queue Performance +- Efficient insertion and extraction operations +- Suitable for task scheduling applications +- Can handle large numbers of tasks efficiently + +### Generating Performance Plots + +To regenerate the performance comparison plots: + +```bash +python3 examples/generate_plots.py +``` + +This will generate visualization plots in the `docs/` directory comparing all three sorting algorithms. + +## Contributing + +This is an academic assignment. For questions or issues, please contact: +- **Carlos Gutierrez** +- **Email**: cgutierrez44833@ucumberlands.edu + +## License + +See LICENSE file for details. + +## References + +1. Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). *Introduction to Algorithms* (3rd ed.). MIT Press. +2. Sedgewick, R., & Wayne, K. (2011). *Algorithms* (4th ed.). Addison-Wesley Professional. +3. Python Software Foundation. (2023). *Python Documentation*. https://docs.python.org/3/ + +## Acknowledgments + +This implementation follows standard algorithms and data structures as described in classical computer science textbooks. The code is designed for educational purposes and academic submission. + diff --git a/docs/.gitkeep b/docs/.gitkeep new file mode 100644 index 0000000..2047a4f --- /dev/null +++ b/docs/.gitkeep @@ -0,0 +1,3 @@ +# This directory contains generated performance comparison plots +# Run examples/generate_plots.py to regenerate the plots + diff --git a/docs/algorithm_distributions.png b/docs/algorithm_distributions.png new file mode 100644 index 0000000..80d7a57 Binary files /dev/null and b/docs/algorithm_distributions.png differ diff --git a/docs/sorting_comparison.png b/docs/sorting_comparison.png new file mode 100644 index 0000000..322b907 Binary files /dev/null and b/docs/sorting_comparison.png differ diff --git a/docs/sorting_comparison_bar.png b/docs/sorting_comparison_bar.png new file mode 100644 index 0000000..636383e Binary files /dev/null and b/docs/sorting_comparison_bar.png differ diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..2356fc8 --- /dev/null +++ b/examples/__init__.py @@ -0,0 +1,12 @@ +""" +Example Scripts for MSCS532 Assignment 4 + +This package contains demonstration scripts for: +- Heapsort implementation +- Priority Queue usage +- Sorting algorithm comparisons + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + diff --git a/examples/comparison_demo.py b/examples/comparison_demo.py new file mode 100644 index 0000000..53a3a1b --- /dev/null +++ b/examples/comparison_demo.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +""" +Sorting Algorithm Comparison Demonstration + +This script demonstrates the empirical comparison of Heapsort, +Quicksort, and Merge Sort on different input sizes and distributions. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import sys +import os + +# Add parent directory to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.comparison import run_comparison + + +def main(): + """Run sorting algorithm comparison.""" + print("\n" + "=" * 80) + print("SORTING ALGORITHM COMPARISON DEMONSTRATION") + print("Author: Carlos Gutierrez") + print("Email: cgutierrez44833@ucumberlands.edu") + print("=" * 80) + + # Run comparison with different sizes + # Note: Large sizes may take significant time + sizes = [100, 1000, 10000] + + print(f"\nComparing Heapsort, Quicksort, and Merge Sort") + print(f"Input sizes: {sizes}") + print(f"\nNote: This may take a few moments...\n") + + results = run_comparison(sizes=sizes) + + print("\n" + "=" * 80) + print("COMPARISON COMPLETE") + print("=" * 80) + print("\nKey Observations:") + print("1. Heapsort: Consistent O(n log n) performance across all input types") + print("2. Quicksort: Fastest on average, but degrades on sorted inputs") + print("3. Merge Sort: Consistent performance, requires O(n) extra space") + print("\nSee README.md for detailed analysis.\n") + + +if __name__ == "__main__": + main() + diff --git a/examples/generate_plots.py b/examples/generate_plots.py new file mode 100644 index 0000000..119fe72 --- /dev/null +++ b/examples/generate_plots.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Generate Performance Comparison Plots + +This script generates visualization plots comparing Heapsort, Quicksort, +and Merge Sort performance on different input sizes and distributions. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import sys +import os +import matplotlib.pyplot as plt +import numpy as np + +# Add parent directory to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.comparison import compare_sorting_algorithms + + +def generate_performance_plots(): + """Generate performance comparison plots.""" + print("Generating performance comparison plots...") + + # Test sizes - using smaller sizes for faster generation + sizes = [100, 500, 1000, 5000, 10000] + + print("Running performance comparisons...") + results = compare_sorting_algorithms(sizes) + + # Create output directory if it doesn't exist + output_dir = os.path.join(os.path.dirname(__file__), '..', 'docs') + os.makedirs(output_dir, exist_ok=True) + + # Extract data for plotting + algorithms = ['heapsort', 'quicksort', 'merge_sort'] + distributions = ['sorted', 'reverse_sorted', 'random'] + + # Create figure with subplots + fig, axes = plt.subplots(1, 3, figsize=(18, 5)) + fig.suptitle('Sorting Algorithm Performance Comparison', fontsize=16, fontweight='bold') + + colors = { + 'heapsort': '#2E86AB', + 'quicksort': '#A23B72', + 'merge_sort': '#F18F01' + } + + markers = { + 'heapsort': 'o', + 'quicksort': 's', + 'merge_sort': '^' + } + + for idx, dist in enumerate(distributions): + ax = axes[idx] + + for algo in algorithms: + times = results[algo][dist] + ax.plot(sizes, times, + marker=markers[algo], + color=colors[algo], + linewidth=2, + markersize=8, + label=algo.replace('_', ' ').title()) + + ax.set_xlabel('Input Size (n)', fontsize=12) + ax.set_ylabel('Time (seconds)', fontsize=12) + ax.set_title(f'{dist.replace("_", " ").title()} Input', fontsize=13, fontweight='bold') + ax.grid(True, alpha=0.3) + ax.legend(fontsize=10) + ax.set_xscale('log') + ax.set_yscale('log') + + plt.tight_layout() + plot_path = os.path.join(output_dir, 'sorting_comparison.png') + plt.savefig(plot_path, dpi=300, bbox_inches='tight') + print(f"Saved plot to: {plot_path}") + plt.close() + + # Create a combined comparison plot + fig, ax = plt.subplots(figsize=(12, 8)) + + x = np.arange(len(sizes)) + width = 0.25 + + for i, algo in enumerate(algorithms): + # Use random distribution for comparison + times = results[algo]['random'] + ax.bar(x + i * width, times, width, + label=algo.replace('_', ' ').title(), + color=colors[algo], + alpha=0.8) + + ax.set_xlabel('Input Size (n)', fontsize=12, fontweight='bold') + ax.set_ylabel('Time (seconds)', fontsize=12, fontweight='bold') + ax.set_title('Sorting Algorithm Performance on Random Input', fontsize=14, fontweight='bold') + ax.set_xticks(x + width) + ax.set_xticklabels([str(s) for s in sizes]) + ax.legend(fontsize=11) + ax.grid(True, alpha=0.3, axis='y') + + plt.tight_layout() + bar_plot_path = os.path.join(output_dir, 'sorting_comparison_bar.png') + plt.savefig(bar_plot_path, dpi=300, bbox_inches='tight') + print(f"Saved bar chart to: {bar_plot_path}") + plt.close() + + # Create a line plot showing all distributions for each algorithm + fig, axes = plt.subplots(1, 3, figsize=(18, 5)) + fig.suptitle('Algorithm Performance Across Different Input Distributions', fontsize=16, fontweight='bold') + + for idx, algo in enumerate(algorithms): + ax = axes[idx] + + for dist in distributions: + times = results[algo][dist] + ax.plot(sizes, times, + marker='o', + linewidth=2, + markersize=6, + label=dist.replace('_', ' ').title()) + + ax.set_xlabel('Input Size (n)', fontsize=12) + ax.set_ylabel('Time (seconds)', fontsize=12) + ax.set_title(f'{algo.replace("_", " ").title()} Performance', fontsize=13, fontweight='bold') + ax.grid(True, alpha=0.3) + ax.legend(fontsize=10) + ax.set_xscale('log') + ax.set_yscale('log') + + plt.tight_layout() + algo_dist_plot_path = os.path.join(output_dir, 'algorithm_distributions.png') + plt.savefig(algo_dist_plot_path, dpi=300, bbox_inches='tight') + print(f"Saved algorithm distribution plot to: {algo_dist_plot_path}") + plt.close() + + print("\nAll plots generated successfully!") + return { + 'comparison': plot_path, + 'bar_chart': bar_plot_path, + 'distributions': algo_dist_plot_path + } + + +if __name__ == "__main__": + try: + generate_performance_plots() + except ImportError: + print("Error: matplotlib is required for generating plots.") + print("Install it with: pip install matplotlib") + sys.exit(1) + diff --git a/examples/heapsort_demo.py b/examples/heapsort_demo.py new file mode 100644 index 0000000..c5897bd --- /dev/null +++ b/examples/heapsort_demo.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +Heapsort Demonstration Script + +This script demonstrates the usage of the heapsort implementation +with various examples and use cases. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import sys +import os + +# Add parent directory to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.heapsort import heapsort, heap_extract_max, heap_insert, _build_max_heap + + +def demo_basic_sorting(): + """Demonstrate basic heapsort functionality.""" + print("=" * 80) + print("BASIC HEAPSORT DEMONSTRATION") + print("=" * 80) + + # Example 1: Simple array + print("\n1. Sorting a simple array:") + arr = [12, 11, 13, 5, 6, 7] + print(f" Original: {arr}") + sorted_arr = heapsort(arr.copy(), inplace=False) + print(f" Sorted: {sorted_arr}") + + # Example 2: Already sorted array + print("\n2. Sorting an already sorted array:") + arr = [1, 2, 3, 4, 5] + print(f" Original: {arr}") + sorted_arr = heapsort(arr.copy(), inplace=False) + print(f" Sorted: {sorted_arr}") + + # Example 3: Reverse sorted array + print("\n3. Sorting a reverse-sorted array:") + arr = [5, 4, 3, 2, 1] + print(f" Original: {arr}") + sorted_arr = heapsort(arr.copy(), inplace=False) + print(f" Sorted: {sorted_arr}") + + # Example 4: Array with duplicates + print("\n4. Sorting an array with duplicate elements:") + arr = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5] + print(f" Original: {arr}") + sorted_arr = heapsort(arr.copy(), inplace=False) + print(f" Sorted: {sorted_arr}") + + # Example 5: In-place sorting + print("\n5. In-place sorting:") + arr = [64, 34, 25, 12, 22, 11, 90] + print(f" Before: {arr}") + heapsort(arr, inplace=True) + print(f" After: {arr}") + + +def demo_heap_operations(): + """Demonstrate heap utility operations.""" + print("\n" + "=" * 80) + print("HEAP OPERATIONS DEMONSTRATION") + print("=" * 80) + + # Build max-heap + print("\n1. Building a max-heap:") + arr = [4, 10, 3, 5, 1] + print(f" Original array: {arr}") + _build_max_heap(arr) + print(f" Max-heap: {arr}") + print(f" Root (max): {arr[0]}") + + # Extract maximum + print("\n2. Extracting maximum from heap:") + heap = [10, 5, 3, 4, 1] + _build_max_heap(heap) + print(f" Heap before: {heap}") + max_val = heap_extract_max(heap) + print(f" Extracted: {max_val}") + print(f" Heap after: {heap}") + + # Insert into heap + print("\n3. Inserting into heap:") + heap = [10, 5, 3, 4, 1] + _build_max_heap(heap) + print(f" Heap before: {heap}") + heap_insert(heap, 15) + print(f" Inserted 15") + print(f" Heap after: {heap}") + print(f" New root: {heap[0]}") + + +def demo_custom_key(): + """Demonstrate sorting with custom key function.""" + print("\n" + "=" * 80) + print("CUSTOM KEY FUNCTION DEMONSTRATION") + print("=" * 80) + + # Sort dictionaries by value + print("\n1. Sorting dictionaries by 'value' key:") + arr = [ + {'name': 'Alice', 'value': 30}, + {'name': 'Bob', 'value': 20}, + {'name': 'Charlie', 'value': 40}, + {'name': 'David', 'value': 10} + ] + print(f" Original: {[d['name'] for d in arr]}") + sorted_arr = heapsort(arr.copy(), key=lambda x: x['value'], inplace=False) + print(f" Sorted by value: {[d['name'] for d in sorted_arr]}") + + # Sort tuples by second element + print("\n2. Sorting tuples by second element:") + arr = [(1, 5), (2, 3), (3, 8), (4, 1)] + print(f" Original: {arr}") + sorted_arr = heapsort(arr.copy(), key=lambda x: x[1], inplace=False) + print(f" Sorted: {sorted_arr}") + + +def demo_performance(): + """Demonstrate performance on different input sizes.""" + print("\n" + "=" * 80) + print("PERFORMANCE DEMONSTRATION") + print("=" * 80) + + import time + import random + + sizes = [100, 1000, 10000] + + print("\nSorting random arrays of different sizes:") + print(f"{'Size':<10} {'Time (seconds)':<20} {'Sorted':<10}") + print("-" * 40) + + for size in sizes: + arr = [random.randint(1, size * 10) for _ in range(size)] + start = time.perf_counter() + sorted_arr = heapsort(arr.copy(), inplace=False) + end = time.perf_counter() + is_sorted = sorted_arr == sorted(arr) + print(f"{size:<10} {end - start:<20.6f} {str(is_sorted):<10}") + + +def main(): + """Run all demonstrations.""" + print("\n" + "=" * 80) + print("HEAPSORT IMPLEMENTATION DEMONSTRATION") + print("Author: Carlos Gutierrez") + print("Email: cgutierrez44833@ucumberlands.edu") + print("=" * 80) + + demo_basic_sorting() + demo_heap_operations() + demo_custom_key() + demo_performance() + + print("\n" + "=" * 80) + print("DEMONSTRATION COMPLETE") + print("=" * 80 + "\n") + + +if __name__ == "__main__": + main() + diff --git a/examples/priority_queue_demo.py b/examples/priority_queue_demo.py new file mode 100644 index 0000000..3cd14f0 --- /dev/null +++ b/examples/priority_queue_demo.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +""" +Priority Queue Demonstration Script + +This script demonstrates the usage of the priority queue implementation +with various examples and use cases. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import sys +import os + +# Add parent directory to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.priority_queue import PriorityQueue +from src.task import Task + + +def demo_basic_operations(): + """Demonstrate basic priority queue operations.""" + print("=" * 80) + print("BASIC PRIORITY QUEUE OPERATIONS") + print("=" * 80) + + # Create a max-heap priority queue + pq = PriorityQueue(is_max_heap=True) + + print("\n1. Creating and inserting tasks:") + tasks = [ + Task("T1", priority=10, arrival_time=0.0, description="Task 1"), + Task("T2", priority=5, arrival_time=1.0, description="Task 2"), + Task("T3", priority=15, arrival_time=2.0, description="Task 3"), + Task("T4", priority=20, arrival_time=3.0, description="Task 4"), + Task("T5", priority=8, arrival_time=4.0, description="Task 5") + ] + + for task in tasks: + pq.insert(task) + print(f" Inserted: {task.task_id} (priority: {task.priority})") + + print(f"\n Queue size: {pq.size()}") + print(f" Is empty: {pq.is_empty()}") + + # Peek at highest priority + print("\n2. Peeking at highest priority task:") + top_task = pq.peek() + print(f" Top task: {top_task.task_id} (priority: {top_task.priority})") + print(f" Queue size after peek: {pq.size()} (unchanged)") + + # Extract tasks in priority order + print("\n3. Extracting tasks in priority order:") + while not pq.is_empty(): + task = pq.extract_max() + print(f" Extracted: {task.task_id} (priority: {task.priority})") + + print(f"\n Queue size: {pq.size()}") + print(f" Is empty: {pq.is_empty()}") + + +def demo_min_heap(): + """Demonstrate min-heap priority queue.""" + print("\n" + "=" * 80) + print("MIN-HEAP PRIORITY QUEUE") + print("=" * 80) + + pq = PriorityQueue(is_max_heap=False) + + print("\nInserting tasks into min-heap:") + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=5, arrival_time=1.0), + Task("T3", priority=15, arrival_time=2.0), + Task("T4", priority=20, arrival_time=3.0), + Task("T5", priority=8, arrival_time=4.0) + ] + + for task in tasks: + pq.insert(task) + print(f" Inserted: {task.task_id} (priority: {task.priority})") + + print("\nExtracting tasks (lowest priority first):") + while not pq.is_empty(): + task = pq.extract_min() + print(f" Extracted: {task.task_id} (priority: {task.priority})") + + +def demo_key_operations(): + """Demonstrate priority key update operations.""" + print("\n" + "=" * 80) + print("PRIORITY KEY UPDATE OPERATIONS") + print("=" * 80) + + pq = PriorityQueue(is_max_heap=True) + + # Insert tasks + task1 = Task("T1", priority=10, arrival_time=0.0) + task2 = Task("T2", priority=20, arrival_time=1.0) + task3 = Task("T3", priority=15, arrival_time=2.0) + + pq.insert(task1) + pq.insert(task2) + pq.insert(task3) + + print("\nInitial state:") + print(f" Top task: {pq.peek().task_id} (priority: {pq.peek().priority})") + + # Increase priority + print("\n1. Increasing T1's priority from 10 to 25:") + success = pq.increase_key(task1, 25) + print(f" Update successful: {success}") + print(f" New priority: {task1.priority}") + print(f" Top task: {pq.peek().task_id} (priority: {pq.peek().priority})") + + # Decrease priority + print("\n2. Decreasing T1's priority from 25 to 12:") + success = pq.decrease_key(task1, 12) + print(f" Update successful: {success}") + print(f" New priority: {task1.priority}") + print(f" Top task: {pq.peek().task_id} (priority: {pq.peek().priority})") + + +def demo_task_scheduling(): + """Demonstrate task scheduling simulation.""" + print("\n" + "=" * 80) + print("TASK SCHEDULING SIMULATION") + print("=" * 80) + + pq = PriorityQueue(is_max_heap=True) + + # Create tasks with deadlines + tasks = [ + Task("T1", priority=10, arrival_time=0.0, deadline=100.0, execution_time=5.0), + Task("T2", priority=15, arrival_time=1.0, deadline=50.0, execution_time=3.0), + Task("T3", priority=8, arrival_time=2.0, deadline=200.0, execution_time=10.0), + Task("T4", priority=20, arrival_time=3.0, deadline=30.0, execution_time=2.0), + Task("T5", priority=12, arrival_time=4.0, deadline=150.0, execution_time=7.0) + ] + + print("\nTasks to schedule:") + for task in tasks: + print(f" {task.task_id}: priority={task.priority}, deadline={task.deadline}, " + f"execution_time={task.execution_time}") + + # Insert all tasks + for task in tasks: + pq.insert(task) + + print("\nScheduling order (by priority):") + current_time = 0.0 + while not pq.is_empty(): + task = pq.extract_max() + print(f"\n Executing: {task.task_id}") + print(f" Priority: {task.priority}") + print(f" Start time: {current_time}") + print(f" Execution time: {task.execution_time}") + print(f" Deadline: {task.deadline}") + + # Check if task will meet deadline + completion_time = current_time + task.execution_time + if task.deadline and completion_time > task.deadline: + print(f" ⚠️ WARNING: Will miss deadline!") + else: + print(f" ✓ Will meet deadline") + + current_time = completion_time + + print(f"\n Total completion time: {current_time}") + + +def demo_large_queue(): + """Demonstrate performance with large queue.""" + print("\n" + "=" * 80) + print("LARGE QUEUE PERFORMANCE") + print("=" * 80) + + import time + + pq = PriorityQueue(is_max_heap=True) + num_tasks = 10000 + + print(f"\nInserting {num_tasks} tasks...") + start = time.perf_counter() + for i in range(num_tasks): + priority = (i * 7) % 100 # Vary priorities + task = Task(f"T{i}", priority=priority, arrival_time=float(i)) + pq.insert(task) + insert_time = time.perf_counter() - start + + print(f" Insert time: {insert_time:.6f} seconds") + print(f" Queue size: {pq.size()}") + + print(f"\nExtracting all tasks...") + start = time.perf_counter() + count = 0 + prev_priority = float('inf') + while not pq.is_empty(): + task = pq.extract_max() + # Verify ordering + assert task.priority <= prev_priority, "Ordering violated!" + prev_priority = task.priority + count += 1 + extract_time = time.perf_counter() - start + + print(f" Extract time: {extract_time:.6f} seconds") + print(f" Tasks extracted: {count}") + print(f" Ordering verified: ✓") + + +def main(): + """Run all demonstrations.""" + print("\n" + "=" * 80) + print("PRIORITY QUEUE IMPLEMENTATION DEMONSTRATION") + print("Author: Carlos Gutierrez") + print("Email: cgutierrez44833@ucumberlands.edu") + print("=" * 80) + + demo_basic_operations() + demo_min_heap() + demo_key_operations() + demo_task_scheduling() + demo_large_queue() + + print("\n" + "=" * 80) + print("DEMONSTRATION COMPLETE") + print("=" * 80 + "\n") + + +if __name__ == "__main__": + main() + diff --git a/examples/scheduler_simulation.py b/examples/scheduler_simulation.py new file mode 100644 index 0000000..e742071 --- /dev/null +++ b/examples/scheduler_simulation.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +""" +Task Scheduler Simulation Demonstration + +This script demonstrates the task scheduler implementation using the priority queue. +It shows various scheduling scenarios and analyzes the results. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import sys +import os + +# Add parent directory to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.scheduler import TaskScheduler, simulate_scheduler +from src.task import Task + + +def demo_basic_scheduling(): + """Demonstrate basic priority-based scheduling.""" + print("=" * 80) + print("BASIC PRIORITY-BASED SCHEDULING") + print("=" * 80) + + scheduler = TaskScheduler() + + # Create tasks with different priorities + tasks = [ + Task("T1", priority=10, arrival_time=0.0, execution_time=5.0, description="Low priority task"), + Task("T2", priority=30, arrival_time=0.0, execution_time=3.0, description="High priority task"), + Task("T3", priority=20, arrival_time=0.0, execution_time=4.0, description="Medium priority task"), + Task("T4", priority=15, arrival_time=0.0, execution_time=2.0, description="Medium-low priority task"), + ] + + print("\nTasks to schedule (in priority order):") + for task in sorted(tasks, key=lambda t: t.priority, reverse=True): + print(f" {task.task_id}: priority={task.priority}, execution_time={task.execution_time}") + + results = scheduler.schedule_tasks(tasks) + scheduler.print_schedule(results) + + +def demo_deadline_scheduling(): + """Demonstrate scheduling with deadlines.""" + print("\n" + "=" * 80) + print("SCHEDULING WITH DEADLINES") + print("=" * 80) + + scheduler = TaskScheduler() + + # Create tasks with deadlines + tasks = [ + Task("T1", priority=10, arrival_time=0.0, deadline=100.0, execution_time=5.0), + Task("T2", priority=20, arrival_time=0.0, deadline=30.0, execution_time=3.0), + Task("T3", priority=15, arrival_time=0.0, deadline=50.0, execution_time=10.0), + Task("T4", priority=25, arrival_time=0.0, deadline=20.0, execution_time=2.0), + Task("T5", priority=12, arrival_time=0.0, deadline=150.0, execution_time=7.0), + ] + + print("\nTasks with deadlines:") + for task in tasks: + print(f" {task.task_id}: priority={task.priority}, deadline={task.deadline}, " + f"execution_time={task.execution_time}") + + results = scheduler.schedule_tasks(tasks) + scheduler.print_schedule(results) + + +def demo_large_workload(): + """Demonstrate scheduling a large number of tasks.""" + print("\n" + "=" * 80) + print("LARGE WORKLOAD SCHEDULING") + print("=" * 80) + + import random + + # Generate random tasks + num_tasks = 50 + tasks = [] + random.seed(42) # For reproducibility + + for i in range(num_tasks): + priority = random.randint(1, 100) + execution_time = random.uniform(0.5, 10.0) + deadline = random.uniform(10.0, 200.0) + tasks.append( + Task(f"T{i+1}", priority=priority, arrival_time=0.0, + deadline=deadline, execution_time=execution_time) + ) + + print(f"\nScheduling {num_tasks} tasks...") + stats = simulate_scheduler(tasks, verbose=False) + + print(f"\nScheduling Statistics:") + print(f" Total tasks: {stats.total_tasks}") + print(f" Completed: {stats.completed_tasks}") + print(f" Deadline met: {stats.deadline_met} ({stats.deadline_met/stats.total_tasks*100:.1f}%)") + print(f" Deadline missed: {stats.deadline_missed} ({stats.deadline_missed/stats.total_tasks*100:.1f}%)") + print(f" Total execution time: {stats.total_execution_time:.2f}") + print(f" Average wait time: {stats.average_wait_time:.2f}") + print(f" Throughput: {stats.throughput:.2f} tasks/time unit") + + +def demo_priority_vs_deadline(): + """Compare priority-based vs deadline-based scheduling.""" + print("\n" + "=" * 80) + print("PRIORITY-BASED vs DEADLINE-AWARE SCHEDULING") + print("=" * 80) + + # Create tasks where high priority tasks have tight deadlines + tasks = [ + Task("T1", priority=30, arrival_time=0.0, deadline=15.0, execution_time=10.0), + Task("T2", priority=20, arrival_time=0.0, deadline=50.0, execution_time=5.0), + Task("T3", priority=10, arrival_time=0.0, deadline=100.0, execution_time=3.0), + ] + + print("\nScenario: High priority task (T1) has tight deadline") + print("Tasks:") + for task in tasks: + print(f" {task.task_id}: priority={task.priority}, deadline={task.deadline}, " + f"execution_time={task.execution_time}") + + # Priority-based scheduling + scheduler = TaskScheduler() + results = scheduler.schedule_tasks(tasks) + + print("\nPriority-based scheduling (highest priority first):") + scheduler.print_schedule(results) + + # Note: This demonstrates that pure priority scheduling may miss deadlines + # A more sophisticated scheduler could use deadline-aware priority adjustment + + +def main(): + """Run all scheduler demonstrations.""" + print("\n" + "=" * 80) + print("TASK SCHEDULER SIMULATION DEMONSTRATION") + print("Author: Carlos Gutierrez") + print("Email: cgutierrez44833@ucumberlands.edu") + print("=" * 80) + + demo_basic_scheduling() + demo_deadline_scheduling() + demo_large_workload() + demo_priority_vs_deadline() + + print("\n" + "=" * 80) + print("DEMONSTRATION COMPLETE") + print("=" * 80) + print("\nKey Observations:") + print("1. Priority-based scheduling ensures high-priority tasks execute first") + print("2. Pure priority scheduling may miss deadlines for lower-priority tasks") + print("3. The scheduler efficiently handles large workloads using O(n log n) algorithm") + print("4. Statistics provide insights into scheduling performance") + print("=" * 80 + "\n") + + +if __name__ == "__main__": + main() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b7ca8f6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,21 @@ +# MSCS532 Assignment 4: Heapsort and Priority Queue Implementation +# Python Dependencies + +# Core functionality uses only Python standard library +# No external dependencies are required for basic usage + +# Optional: For generating performance comparison plots +matplotlib>=3.5.0 +numpy>=1.21.0 + +# Optional: For enhanced testing (if desired) +# pytest>=7.0.0 +# pytest-cov>=4.0.0 + +# Optional: For performance profiling (if desired) +# cProfile (built-in) +# line_profiler>=4.0.0 + +# Note: Core functionality works with Python 3.7+ standard library only +# matplotlib and numpy are only needed for generating visualization plots + diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..09294e5 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,17 @@ +""" +MSCS532 Assignment 4: Heapsort and Priority Queue Implementation + +This package contains implementations of: +- Heapsort algorithm +- Priority Queue data structure +- Task scheduling simulation +- Performance comparison utilities + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +__version__ = "1.0.0" +__author__ = "Carlos Gutierrez" +__email__ = "cgutierrez44833@ucumberlands.edu" + diff --git a/src/comparison.py b/src/comparison.py new file mode 100644 index 0000000..60214b3 --- /dev/null +++ b/src/comparison.py @@ -0,0 +1,243 @@ +""" +Sorting Algorithm Comparison Module + +This module provides utilities to empirically compare the performance of +Heapsort with other sorting algorithms (Quicksort and Merge Sort) on +different input sizes and distributions. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import time +import random +from typing import List, Callable, Dict, Tuple +from .heapsort import heapsort + + +def quicksort(arr: List[int], low: int = 0, high: int = None) -> List[int]: + """ + Quicksort implementation for comparison. + + Time Complexity: O(n log n) average, O(n²) worst case + + Args: + arr: Array to sort + low: Starting index + high: Ending index + + Returns: + List[int]: Sorted array + """ + if high is None: + high = len(arr) - 1 + arr = arr.copy() + + # Use iterative approach to avoid recursion depth issues + stack = [(low, high)] + + while stack: + low, high = stack.pop() + + if low < high: + pi = _partition(arr, low, high) + # Push smaller partition first to reduce stack size + if pi - low < high - pi: + stack.append((pi + 1, high)) + stack.append((low, pi - 1)) + else: + stack.append((low, pi - 1)) + stack.append((pi + 1, high)) + + return arr + + +def _partition(arr: List[int], low: int, high: int) -> int: + """Partition function for Quicksort.""" + pivot = arr[high] + i = low - 1 + + for j in range(low, high): + if arr[j] <= pivot: + i += 1 + arr[i], arr[j] = arr[j], arr[i] + + arr[i + 1], arr[high] = arr[high], arr[i + 1] + return i + 1 + + +def merge_sort(arr: List[int]) -> List[int]: + """ + Merge Sort implementation for comparison. + + Time Complexity: O(n log n) in all cases + + Args: + arr: Array to sort + + Returns: + List[int]: Sorted array + """ + if len(arr) <= 1: + return arr.copy() + + mid = len(arr) // 2 + left = merge_sort(arr[:mid]) + right = merge_sort(arr[mid:]) + + return _merge(left, right) + + +def _merge(left: List[int], right: List[int]) -> List[int]: + """Merge function for Merge Sort.""" + result = [] + i = j = 0 + + while i < len(left) and j < len(right): + if left[i] <= right[j]: + result.append(left[i]) + i += 1 + else: + result.append(right[j]) + j += 1 + + result.extend(left[i:]) + result.extend(right[j:]) + return result + + +def generate_sorted_array(n: int) -> List[int]: + """Generate a sorted array of size n.""" + return list(range(n)) + + +def generate_reverse_sorted_array(n: int) -> List[int]: + """Generate a reverse-sorted array of size n.""" + return list(range(n, 0, -1)) + + +def generate_random_array(n: int, seed: int = None) -> List[int]: + """Generate a random array of size n.""" + if seed is not None: + random.seed(seed) + return [random.randint(1, n * 10) for _ in range(n)] + + +def measure_time(func: Callable, arr: List[int]) -> Tuple[float, List[int]]: + """ + Measure the execution time of a sorting function. + + Args: + func: The sorting function to measure + arr: The array to sort + + Returns: + Tuple[float, List[int]]: Execution time in seconds and sorted array + """ + start_time = time.perf_counter() + sorted_arr = func(arr) + end_time = time.perf_counter() + return end_time - start_time, sorted_arr + + +def compare_sorting_algorithms( + sizes: List[int], + distributions: Dict[str, Callable[[int], List[int]]] = None +) -> Dict[str, Dict[str, List[float]]]: + """ + Compare Heapsort, Quicksort, and Merge Sort on different input sizes and distributions. + + Args: + sizes: List of input sizes to test + distributions: Dictionary mapping distribution names to generator functions + + Returns: + Dictionary containing timing results for each algorithm and distribution + + Examples: + >>> results = compare_sorting_algorithms([100, 1000, 10000]) + >>> print(results['heapsort']['random'][0]) + """ + if distributions is None: + distributions = { + 'sorted': generate_sorted_array, + 'reverse_sorted': generate_reverse_sorted_array, + 'random': generate_random_array + } + + algorithms = { + 'heapsort': lambda arr: heapsort(arr.copy()), + 'quicksort': quicksort, + 'merge_sort': merge_sort + } + + results = { + algo: {dist: [] for dist in distributions.keys()} + for algo in algorithms.keys() + } + + for size in sizes: + print(f"Testing with size {size}...") + for dist_name, dist_func in distributions.items(): + arr = dist_func(size) + + for algo_name, algo_func in algorithms.items(): + time_taken, _ = measure_time(algo_func, arr) + results[algo_name][dist_name].append(time_taken) + print(f" {algo_name} ({dist_name}): {time_taken:.6f}s") + + return results + + +def print_comparison_results(results: Dict[str, Dict[str, List[float]]], sizes: List[int]) -> None: + """ + Print comparison results in a formatted table. + + Args: + results: Results dictionary from compare_sorting_algorithms + sizes: List of input sizes that were tested + """ + print("\n" + "=" * 80) + print("SORTING ALGORITHM COMPARISON RESULTS") + print("=" * 80) + + distributions = list(next(iter(results.values())).keys()) + + for dist in distributions: + print(f"\n{dist.upper().replace('_', ' ')} INPUT:") + print("-" * 80) + print(f"{'Size':<10} {'Heapsort':<15} {'Quicksort':<15} {'Merge Sort':<15}") + print("-" * 80) + + for i, size in enumerate(sizes): + heapsort_time = results['heapsort'][dist][i] + quicksort_time = results['quicksort'][dist][i] + merge_sort_time = results['merge_sort'][dist][i] + + print(f"{size:<10} {heapsort_time:<15.6f} {quicksort_time:<15.6f} {merge_sort_time:<15.6f}") + + print("-" * 80) + + +def run_comparison(sizes: List[int] = None) -> Dict[str, Dict[str, List[float]]]: + """ + Run a complete comparison of sorting algorithms. + + Args: + sizes: List of input sizes to test (default: [100, 1000, 10000, 100000]) + + Returns: + Dictionary containing timing results + """ + if sizes is None: + sizes = [100, 1000, 10000, 100000] + + print("Starting sorting algorithm comparison...") + print(f"Testing sizes: {sizes}") + print() + + results = compare_sorting_algorithms(sizes) + print_comparison_results(results, sizes) + + return results + diff --git a/src/heapsort.py b/src/heapsort.py new file mode 100644 index 0000000..7b24fd3 --- /dev/null +++ b/src/heapsort.py @@ -0,0 +1,217 @@ +""" +Heapsort Implementation + +This module provides a complete implementation of the Heapsort algorithm +using a max-heap data structure. The implementation includes: +- Max-heap construction +- Heap property maintenance +- In-place sorting with O(n log n) time complexity + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +from typing import List, TypeVar, Callable + +T = TypeVar('T') + + +def _heapify(arr: List[T], n: int, i: int, key: Callable[[T], T] = lambda x: x) -> None: + """ + Maintain the max-heap property for a subtree rooted at index i. + + This function assumes that the subtrees rooted at left and right children + are already max-heaps, and ensures that the subtree rooted at i is also a max-heap. + + Time Complexity: O(log n) where n is the size of the heap + + Args: + arr: The array representing the heap + n: Size of the heap (may be smaller than len(arr)) + i: Index of the root of the subtree to heapify + key: Optional function to extract comparison key from elements + + Examples: + >>> arr = [4, 10, 3, 5, 1] + >>> _heapify(arr, 5, 0) + >>> arr + [10, 5, 3, 4, 1] + """ + largest = i # Initialize largest as root + left = 2 * i + 1 # Left child index + right = 2 * i + 2 # Right child index + + # Compare root with left child + if left < n and key(arr[left]) > key(arr[largest]): + largest = left + + # Compare largest with right child + if right < n and key(arr[right]) > key(arr[largest]): + largest = right + + # If largest is not root, swap and continue heapifying + if largest != i: + arr[i], arr[largest] = arr[largest], arr[i] + _heapify(arr, n, largest, key) + + +def _build_max_heap(arr: List[T], key: Callable[[T], T] = lambda x: x) -> None: + """ + Build a max-heap from an unsorted array. + + This function rearranges the array elements to satisfy the max-heap property. + It starts from the last non-leaf node and works backwards to the root. + + Time Complexity: O(n) - linear time despite nested loops + + Args: + arr: The array to convert into a max-heap + key: Optional function to extract comparison key from elements + + Examples: + >>> arr = [4, 10, 3, 5, 1] + >>> _build_max_heap(arr) + >>> arr + [10, 5, 3, 4, 1] + """ + n = len(arr) + # Start from the last non-leaf node and work backwards + # Last non-leaf node is at index (n // 2) - 1 + for i in range(n // 2 - 1, -1, -1): + _heapify(arr, n, i, key) + + +def heapsort(arr: List[T], key: Callable[[T], T] = lambda x: x, inplace: bool = True) -> List[T]: + """ + Sort an array using the Heapsort algorithm. + + Heapsort is an in-place sorting algorithm with O(n log n) time complexity + in all cases (worst, average, and best). It works by: + 1. Building a max-heap from the input array + 2. Repeatedly extracting the maximum element and placing it at the end + 3. Reducing the heap size and maintaining the heap property + + Time Complexity: O(n log n) in all cases + Space Complexity: O(1) for in-place sorting, O(n) if not in-place + + Args: + arr: The array to sort + key: Optional function to extract comparison key from elements + inplace: If True, sort in-place (modifies original array). If False, returns a new sorted array. + + Returns: + List[T]: The sorted array (same reference if inplace=True, new list if inplace=False) + + Examples: + >>> arr = [12, 11, 13, 5, 6, 7] + >>> heapsort(arr) + [5, 6, 7, 11, 12, 13] + >>> arr + [5, 6, 7, 11, 12, 13] + + >>> arr = [3, 1, 4, 1, 5, 9, 2, 6] + >>> sorted_arr = heapsort(arr, inplace=False) + >>> sorted_arr + [1, 1, 2, 3, 4, 5, 6, 9] + >>> arr + [3, 1, 4, 1, 5, 9, 2, 6] + """ + if not arr: + return arr + + # Create a copy if not sorting in-place + if not inplace: + arr = arr.copy() + + n = len(arr) + + # Step 1: Build max-heap + _build_max_heap(arr, key) + + # Step 2: Extract elements one by one + for i in range(n - 1, 0, -1): + # Move current root (maximum) to end + arr[0], arr[i] = arr[i], arr[0] + + # Reduce heap size and heapify the root + _heapify(arr, i, 0, key) + + return arr + + +def heap_extract_max(arr: List[T], key: Callable[[T], T] = lambda x: x) -> T: + """ + Extract and return the maximum element from a max-heap. + + This function removes the maximum element from the heap and maintains + the heap property. The heap is assumed to be a valid max-heap. + + Time Complexity: O(log n) + + Args: + arr: The max-heap array + key: Optional function to extract comparison key from elements + + Returns: + T: The maximum element + + Raises: + IndexError: If the heap is empty + + Examples: + >>> heap = [10, 5, 3, 4, 1] + >>> max_val = heap_extract_max(heap) + >>> max_val + 10 + >>> heap + [5, 4, 3, 1] + """ + if not arr: + raise IndexError("Cannot extract from empty heap") + + if len(arr) == 1: + return arr.pop() + + # Store the maximum (root) + max_val = arr[0] + + # Move last element to root + arr[0] = arr.pop() + + # Heapify to maintain heap property + _heapify(arr, len(arr), 0, key) + + return max_val + + +def heap_insert(arr: List[T], item: T, key: Callable[[T], T] = lambda x: x) -> None: + """ + Insert an element into a max-heap. + + This function adds a new element to the heap and maintains the heap property + by bubbling up the element if necessary. + + Time Complexity: O(log n) + + Args: + arr: The max-heap array + item: The element to insert + key: Optional function to extract comparison key from elements + + Examples: + >>> heap = [10, 5, 3, 4, 1] + >>> heap_insert(heap, 15) + >>> heap + [15, 10, 3, 4, 1, 5] + """ + arr.append(item) + i = len(arr) - 1 + + # Bubble up: compare with parent and swap if necessary + while i > 0: + parent = (i - 1) // 2 + if key(arr[parent]) >= key(arr[i]): + break + arr[parent], arr[i] = arr[i], arr[parent] + i = parent + diff --git a/src/priority_queue.py b/src/priority_queue.py new file mode 100644 index 0000000..022f7d1 --- /dev/null +++ b/src/priority_queue.py @@ -0,0 +1,379 @@ +""" +Priority Queue Implementation + +This module implements a Priority Queue data structure using a binary heap. +The implementation supports both max-heap (highest priority first) and +min-heap (lowest priority first) configurations. + +The priority queue is implemented using a Python list to represent the binary heap, +which provides efficient access to parent and child nodes through index calculations. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +from typing import List, Optional, TypeVar, Callable +from .task import Task + +T = TypeVar('T') + + +class PriorityQueue: + """ + A Priority Queue implementation using a binary heap. + + This class supports both max-heap and min-heap configurations. By default, + it uses a max-heap where higher priority values are extracted first. + + The heap is implemented using a list, where for a node at index i: + - Parent is at index (i-1)//2 + - Left child is at index 2*i+1 + - Right child is at index 2*i+2 + + Attributes: + heap (List[T]): The list representing the binary heap + is_max_heap (bool): True for max-heap, False for min-heap + key (Callable): Function to extract priority/comparison key + + Time Complexity Analysis: + - insert(): O(log n) - bubble up operation + - extract_max()/extract_min(): O(log n) - heapify operation + - increase_key()/decrease_key(): O(log n) - bubble up/down + - is_empty(): O(1) - constant time check + - peek(): O(1) - constant time access to root + + Space Complexity: O(n) where n is the number of elements + + Examples: + >>> pq = PriorityQueue() + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.insert(Task("T2", priority=5, arrival_time=1.0)) + >>> pq.insert(Task("T3", priority=15, arrival_time=2.0)) + >>> task = pq.extract_max() + >>> task.task_id + 'T3' + """ + + def __init__(self, is_max_heap: bool = True, key: Optional[Callable[[T], int]] = None): + """ + Initialize an empty priority queue. + + Args: + is_max_heap: If True, use max-heap (higher priority first). + If False, use min-heap (lower priority first). + key: Optional function to extract priority from elements. + If None, elements are compared directly. + + Examples: + >>> pq = PriorityQueue(is_max_heap=True) + >>> pq.is_empty() + True + """ + self.heap: List[T] = [] + self.is_max_heap = is_max_heap + self.key = key if key is not None else (lambda x: x.priority if isinstance(x, Task) else x) + + def _compare(self, a: T, b: T) -> bool: + """ + Compare two elements based on heap type. + + Args: + a: First element + b: Second element + + Returns: + bool: True if a should be above b in the heap + """ + val_a = self.key(a) + val_b = self.key(b) + if self.is_max_heap: + return val_a > val_b + else: + return val_a < val_b + + def _heapify_up(self, index: int) -> None: + """ + Maintain heap property by bubbling up an element. + + Time Complexity: O(log n) + + Args: + index: Index of the element to bubble up + """ + while index > 0: + parent = (index - 1) // 2 + if self._compare(self.heap[index], self.heap[parent]): + self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index] + index = parent + else: + break + + def _heapify_down(self, index: int) -> None: + """ + Maintain heap property by bubbling down an element. + + Time Complexity: O(log n) + + Args: + index: Index of the element to bubble down + """ + n = len(self.heap) + while True: + largest_or_smallest = index + left = 2 * index + 1 + right = 2 * index + 2 + + # Compare with left child + if left < n and self._compare(self.heap[left], self.heap[largest_or_smallest]): + largest_or_smallest = left + + # Compare with right child + if right < n and self._compare(self.heap[right], self.heap[largest_or_smallest]): + largest_or_smallest = right + + # If element is in correct position, stop + if largest_or_smallest == index: + break + + # Swap and continue + self.heap[index], self.heap[largest_or_smallest] = \ + self.heap[largest_or_smallest], self.heap[index] + index = largest_or_smallest + + def insert(self, item: T) -> None: + """ + Insert an item into the priority queue. + + The item is added to the end of the heap and then bubbled up + to maintain the heap property. + + Time Complexity: O(log n) where n is the number of elements + + Args: + item: The item to insert + + Examples: + >>> pq = PriorityQueue() + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.size() + 1 + """ + self.heap.append(item) + self._heapify_up(len(self.heap) - 1) + + def extract_max(self) -> T: + """ + Extract and return the item with the highest priority (max-heap). + + This operation removes the root of the heap, replaces it with the + last element, and maintains the heap property. + + Time Complexity: O(log n) + + Returns: + T: The item with the highest priority + + Raises: + IndexError: If the priority queue is empty + + Examples: + >>> pq = PriorityQueue() + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.insert(Task("T2", priority=5, arrival_time=1.0)) + >>> task = pq.extract_max() + >>> task.priority + 10 + """ + if self.is_empty(): + raise IndexError("Cannot extract from empty priority queue") + + if len(self.heap) == 1: + return self.heap.pop() + + root = self.heap[0] + self.heap[0] = self.heap.pop() + self._heapify_down(0) + + return root + + def extract_min(self) -> T: + """ + Extract and return the item with the lowest priority (min-heap). + + This operation removes the root of the heap, replaces it with the + last element, and maintains the heap property. + + Time Complexity: O(log n) + + Returns: + T: The item with the lowest priority + + Raises: + IndexError: If the priority queue is empty + + Examples: + >>> pq = PriorityQueue(is_max_heap=False) + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.insert(Task("T2", priority=5, arrival_time=1.0)) + >>> task = pq.extract_min() + >>> task.priority + 5 + """ + if self.is_empty(): + raise IndexError("Cannot extract from empty priority queue") + + if len(self.heap) == 1: + return self.heap.pop() + + root = self.heap[0] + self.heap[0] = self.heap.pop() + self._heapify_down(0) + + return root + + def increase_key(self, item: T, new_priority: int) -> bool: + """ + Increase the priority of an existing item in the priority queue. + + This operation finds the item, updates its priority, and bubbles it up + if necessary to maintain the heap property. + + Time Complexity: O(n) to find the item + O(log n) to bubble up = O(n) + Note: This could be optimized to O(log n) with a hash map for O(1) lookup + + Args: + item: The item whose priority should be increased + new_priority: The new priority value + + Returns: + bool: True if the item was found and updated, False otherwise + + Examples: + >>> pq = PriorityQueue() + >>> task = Task("T1", priority=10, arrival_time=0.0) + >>> pq.insert(task) + >>> pq.increase_key(task, 20) + True + >>> task.priority + 20 + """ + # Find the item in the heap + try: + index = self.heap.index(item) + except ValueError: + return False + + # Update priority + if isinstance(item, Task): + item.update_priority(new_priority) + + # Bubble up if necessary + self._heapify_up(index) + return True + + def decrease_key(self, item: T, new_priority: int) -> bool: + """ + Decrease the priority of an existing item in the priority queue. + + This operation finds the item, updates its priority, and bubbles it down + if necessary to maintain the heap property. + + Time Complexity: O(n) to find the item + O(log n) to bubble down = O(n) + Note: This could be optimized to O(log n) with a hash map for O(1) lookup + + Args: + item: The item whose priority should be decreased + new_priority: The new priority value + + Returns: + bool: True if the item was found and updated, False otherwise + + Examples: + >>> pq = PriorityQueue() + >>> task = Task("T1", priority=20, arrival_time=0.0) + >>> pq.insert(task) + >>> pq.decrease_key(task, 10) + True + >>> task.priority + 10 + """ + # Find the item in the heap + try: + index = self.heap.index(item) + except ValueError: + return False + + # Update priority + if isinstance(item, Task): + item.update_priority(new_priority) + + # Bubble down if necessary + self._heapify_down(index) + return True + + def is_empty(self) -> bool: + """ + Check if the priority queue is empty. + + Time Complexity: O(1) + + Returns: + bool: True if the priority queue is empty, False otherwise + + Examples: + >>> pq = PriorityQueue() + >>> pq.is_empty() + True + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.is_empty() + False + """ + return len(self.heap) == 0 + + def size(self) -> int: + """ + Get the number of items in the priority queue. + + Time Complexity: O(1) + + Returns: + int: The number of items in the priority queue + + Examples: + >>> pq = PriorityQueue() + >>> pq.size() + 0 + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> pq.size() + 1 + """ + return len(self.heap) + + def peek(self) -> Optional[T]: + """ + Get the highest (or lowest) priority item without removing it. + + Time Complexity: O(1) + + Returns: + Optional[T]: The root item, or None if the queue is empty + + Examples: + >>> pq = PriorityQueue() + >>> pq.insert(Task("T1", priority=10, arrival_time=0.0)) + >>> task = pq.peek() + >>> task.task_id + 'T1' + """ + if self.is_empty(): + return None + return self.heap[0] + + def __str__(self) -> str: + """String representation of the priority queue.""" + return f"PriorityQueue(size={self.size()}, is_max_heap={self.is_max_heap})" + + def __repr__(self) -> str: + """Detailed string representation.""" + return self.__str__() + diff --git a/src/scheduler.py b/src/scheduler.py new file mode 100644 index 0000000..ef589cf --- /dev/null +++ b/src/scheduler.py @@ -0,0 +1,272 @@ +""" +Task Scheduler Simulation + +This module implements a task scheduler using the priority queue data structure. +The scheduler demonstrates how priority queues can be used for task scheduling +in operating systems, job queues, and other scheduling applications. + +The scheduler supports: +- Priority-based scheduling (highest priority first) +- Deadline monitoring +- Execution time tracking +- Scheduling statistics and analysis + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +from typing import List, Dict, Optional +from dataclasses import dataclass +from .priority_queue import PriorityQueue +from .task import Task + + +@dataclass +class SchedulingResult: + """ + Represents the result of scheduling a task. + + Attributes: + task_id (str): ID of the scheduled task + start_time (float): Time when task execution started + completion_time (float): Time when task execution completed + deadline_met (bool): Whether the task met its deadline + wait_time (float): Time the task waited before execution + """ + task_id: str + start_time: float + completion_time: float + deadline_met: bool + wait_time: float + + +@dataclass +class SchedulingStatistics: + """ + Statistics from a scheduling simulation. + + Attributes: + total_tasks (int): Total number of tasks scheduled + completed_tasks (int): Number of tasks that completed + deadline_met (int): Number of tasks that met their deadlines + deadline_missed (int): Number of tasks that missed their deadlines + total_execution_time (float): Total time spent executing tasks + average_wait_time (float): Average wait time for all tasks + throughput (float): Tasks completed per unit time + """ + total_tasks: int + completed_tasks: int + deadline_met: int + deadline_missed: int + total_execution_time: float + average_wait_time: float + throughput: float + + +class TaskScheduler: + """ + A priority-based task scheduler using a priority queue. + + This scheduler implements a priority-based scheduling algorithm where + tasks with higher priority are executed first. The scheduler maintains + a priority queue and executes tasks in priority order. + + Time Complexity Analysis: + - schedule_tasks(): O(n log n) where n is the number of tasks + - Inserting n tasks: O(n log n) + - Extracting n tasks: O(n log n) + - Space Complexity: O(n) for the priority queue + + Examples: + >>> scheduler = TaskScheduler() + >>> tasks = [ + ... Task("T1", priority=10, arrival_time=0.0, deadline=100.0, execution_time=5.0), + ... Task("T2", priority=20, arrival_time=0.0, deadline=50.0, execution_time=3.0) + ... ] + >>> results = scheduler.schedule_tasks(tasks) + >>> print(f"Scheduled {len(results)} tasks") + Scheduled 2 tasks + """ + + def __init__(self): + """Initialize an empty task scheduler.""" + self.priority_queue = PriorityQueue(is_max_heap=True) + self.current_time = 0.0 + + def schedule_tasks(self, tasks: List[Task]) -> List[SchedulingResult]: + """ + Schedule and execute a list of tasks based on priority. + + This method implements a priority-based scheduling algorithm: + 1. All tasks are inserted into the priority queue + 2. Tasks are extracted and executed in priority order + 3. Execution times and deadlines are tracked + + Time Complexity: O(n log n) where n is the number of tasks + - Inserting n tasks: O(n log n) + - Extracting n tasks: O(n log n) + + Args: + tasks: List of tasks to schedule + + Returns: + List[SchedulingResult]: Results of scheduling each task + + Examples: + >>> scheduler = TaskScheduler() + >>> tasks = [ + ... Task("T1", priority=10, arrival_time=0.0, execution_time=5.0), + ... Task("T2", priority=20, arrival_time=0.0, execution_time=3.0) + ... ] + >>> results = scheduler.schedule_tasks(tasks) + >>> results[0].task_id + 'T2' + """ + # Reset scheduler state + self.priority_queue = PriorityQueue(is_max_heap=True) + self.current_time = 0.0 + results: List[SchedulingResult] = [] + + # Insert all tasks into priority queue + # Time Complexity: O(n log n) for n insertions + for task in tasks: + self.priority_queue.insert(task) + + # Execute tasks in priority order + # Time Complexity: O(n log n) for n extractions + while not self.priority_queue.is_empty(): + task = self.priority_queue.extract_max() + + # Calculate scheduling metrics + start_time = self.current_time + wait_time = start_time - task.arrival_time + completion_time = start_time + task.execution_time + + # Check if deadline is met + deadline_met = True + if task.deadline is not None: + deadline_met = completion_time <= task.deadline + + # Create result + result = SchedulingResult( + task_id=task.task_id, + start_time=start_time, + completion_time=completion_time, + deadline_met=deadline_met, + wait_time=wait_time + ) + results.append(result) + + # Update current time + self.current_time = completion_time + + return results + + def get_statistics(self, results: List[SchedulingResult]) -> SchedulingStatistics: + """ + Calculate scheduling statistics from results. + + Time Complexity: O(n) where n is the number of results + + Args: + results: List of scheduling results + + Returns: + SchedulingStatistics: Calculated statistics + """ + if not results: + return SchedulingStatistics( + total_tasks=0, + completed_tasks=0, + deadline_met=0, + deadline_missed=0, + total_execution_time=0.0, + average_wait_time=0.0, + throughput=0.0 + ) + + total_tasks = len(results) + completed_tasks = len(results) + deadline_met = sum(1 for r in results if r.deadline_met) + deadline_missed = total_tasks - deadline_met + + total_execution_time = max(r.completion_time for r in results) if results else 0.0 + average_wait_time = sum(r.wait_time for r in results) / total_tasks if total_tasks > 0 else 0.0 + throughput = completed_tasks / total_execution_time if total_execution_time > 0 else 0.0 + + return SchedulingStatistics( + total_tasks=total_tasks, + completed_tasks=completed_tasks, + deadline_met=deadline_met, + deadline_missed=deadline_missed, + total_execution_time=total_execution_time, + average_wait_time=average_wait_time, + throughput=throughput + ) + + def print_schedule(self, results: List[SchedulingResult]) -> None: + """ + Print a formatted schedule of task execution. + + Args: + results: List of scheduling results to display + """ + print("\n" + "=" * 80) + print("TASK SCHEDULING RESULTS") + print("=" * 80) + print(f"{'Task ID':<10} {'Start':<12} {'Completion':<12} {'Wait':<12} {'Deadline':<10}") + print("-" * 80) + + for result in results: + deadline_status = "✓ Met" if result.deadline_met else "✗ Missed" + print(f"{result.task_id:<10} {result.start_time:<12.2f} " + f"{result.completion_time:<12.2f} {result.wait_time:<12.2f} {deadline_status:<10}") + + print("-" * 80) + + # Print statistics + stats = self.get_statistics(results) + print(f"\nStatistics:") + print(f" Total tasks: {stats.total_tasks}") + print(f" Completed: {stats.completed_tasks}") + print(f" Deadline met: {stats.deadline_met}") + print(f" Deadline missed: {stats.deadline_missed}") + print(f" Total execution time: {stats.total_execution_time:.2f}") + print(f" Average wait time: {stats.average_wait_time:.2f}") + print(f" Throughput: {stats.throughput:.2f} tasks/time unit") + print("=" * 80) + + +def simulate_scheduler(tasks: List[Task], verbose: bool = True) -> SchedulingStatistics: + """ + Simulate a task scheduler with the given tasks. + + This is a convenience function that creates a scheduler, schedules tasks, + and returns statistics. + + Time Complexity: O(n log n) where n is the number of tasks + + Args: + tasks: List of tasks to schedule + verbose: If True, print the schedule + + Returns: + SchedulingStatistics: Statistics from the simulation + + Examples: + >>> tasks = [ + ... Task("T1", priority=10, arrival_time=0.0, deadline=100.0, execution_time=5.0), + ... Task("T2", priority=20, arrival_time=0.0, deadline=50.0, execution_time=3.0) + ... ] + >>> stats = simulate_scheduler(tasks, verbose=False) + >>> stats.total_tasks + 2 + """ + scheduler = TaskScheduler() + results = scheduler.schedule_tasks(tasks) + + if verbose: + scheduler.print_schedule(results) + + return scheduler.get_statistics(results) + diff --git a/src/task.py b/src/task.py new file mode 100644 index 0000000..212bb07 --- /dev/null +++ b/src/task.py @@ -0,0 +1,124 @@ +""" +Task Module + +This module defines the Task class used to represent individual tasks +in the priority queue implementation. Each task contains information +such as task ID, priority, arrival time, and deadline. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Task: + """ + Represents a task with priority, timing, and identification information. + + Attributes: + task_id (str): Unique identifier for the task + priority (int): Priority level (higher values = higher priority) + arrival_time (float): Time when the task arrives in the system + deadline (Optional[float]): Deadline for task completion (None if no deadline) + execution_time (float): Estimated time required to execute the task + description (str): Optional description of the task + + Examples: + >>> task = Task("T1", priority=10, arrival_time=0.0, deadline=100.0) + >>> print(task) + Task(task_id='T1', priority=10, arrival_time=0.0, deadline=100.0, execution_time=1.0, description='') + """ + + task_id: str + priority: int + arrival_time: float + deadline: Optional[float] = None + execution_time: float = 1.0 + description: str = "" + + def __lt__(self, other: 'Task') -> bool: + """ + Compare tasks by priority (for min-heap: lower priority first). + + Args: + other: Another Task object to compare with + + Returns: + bool: True if this task has lower priority than other + """ + return self.priority < other.priority + + def __gt__(self, other: 'Task') -> bool: + """ + Compare tasks by priority (for max-heap: higher priority first). + + Args: + other: Another Task object to compare with + + Returns: + bool: True if this task has higher priority than other + """ + return self.priority > other.priority + + def __eq__(self, other: 'Task') -> bool: + """ + Check if two tasks have the same priority. + + Args: + other: Another Task object to compare with + + Returns: + bool: True if tasks have the same priority + """ + if not isinstance(other, Task): + return False + return self.priority == other.priority + + def __le__(self, other: 'Task') -> bool: + """Less than or equal comparison.""" + return self.priority <= other.priority + + def __ge__(self, other: 'Task') -> bool: + """Greater than or equal comparison.""" + return self.priority >= other.priority + + def update_priority(self, new_priority: int) -> None: + """ + Update the priority of the task. + + Args: + new_priority: The new priority value + """ + self.priority = new_priority + + def is_overdue(self, current_time: float) -> bool: + """ + Check if the task has passed its deadline. + + Args: + current_time: The current time in the system + + Returns: + bool: True if deadline exists and has passed + """ + if self.deadline is None: + return False + return current_time > self.deadline + + def time_until_deadline(self, current_time: float) -> Optional[float]: + """ + Calculate the time remaining until the deadline. + + Args: + current_time: The current time in the system + + Returns: + Optional[float]: Time remaining until deadline, or None if no deadline + """ + if self.deadline is None: + return None + return max(0.0, self.deadline - current_time) + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..c84576c --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,13 @@ +""" +Test Suite for MSCS532 Assignment 4 + +This package contains comprehensive tests for: +- Heapsort implementation +- Priority Queue implementation +- Task class +- Sorting algorithm comparisons + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + diff --git a/tests/test_comparison.py b/tests/test_comparison.py new file mode 100644 index 0000000..0b4ece0 --- /dev/null +++ b/tests/test_comparison.py @@ -0,0 +1,120 @@ +""" +Test Suite for Sorting Algorithm Comparison + +This module contains tests for the comparison utilities and sorting algorithms. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import unittest +import sys +import os + +# Add parent directory to path to import src modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.comparison import ( + quicksort, merge_sort, + generate_sorted_array, generate_reverse_sorted_array, generate_random_array +) + + +class TestSortingAlgorithms(unittest.TestCase): + """Test cases for sorting algorithm implementations.""" + + def test_quicksort_empty(self): + """Test quicksort on empty array.""" + arr = [] + result = quicksort(arr) + self.assertEqual(result, []) + + def test_quicksort_single(self): + """Test quicksort on single element.""" + arr = [42] + result = quicksort(arr) + self.assertEqual(result, [42]) + + def test_quicksort_sorted(self): + """Test quicksort on sorted array.""" + arr = [1, 2, 3, 4, 5] + result = quicksort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_quicksort_reverse(self): + """Test quicksort on reverse-sorted array.""" + arr = [5, 4, 3, 2, 1] + result = quicksort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_quicksort_random(self): + """Test quicksort on random array.""" + arr = [3, 1, 4, 1, 5, 9, 2, 6] + result = quicksort(arr) + self.assertEqual(result, [1, 1, 2, 3, 4, 5, 6, 9]) + + def test_merge_sort_empty(self): + """Test merge sort on empty array.""" + arr = [] + result = merge_sort(arr) + self.assertEqual(result, []) + + def test_merge_sort_single(self): + """Test merge sort on single element.""" + arr = [42] + result = merge_sort(arr) + self.assertEqual(result, [42]) + + def test_merge_sort_sorted(self): + """Test merge sort on sorted array.""" + arr = [1, 2, 3, 4, 5] + result = merge_sort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_merge_sort_reverse(self): + """Test merge sort on reverse-sorted array.""" + arr = [5, 4, 3, 2, 1] + result = merge_sort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_merge_sort_random(self): + """Test merge sort on random array.""" + arr = [3, 1, 4, 1, 5, 9, 2, 6] + result = merge_sort(arr) + self.assertEqual(result, [1, 1, 2, 3, 4, 5, 6, 9]) + + +class TestArrayGenerators(unittest.TestCase): + """Test cases for array generator functions.""" + + def test_generate_sorted_array(self): + """Test generating sorted array.""" + arr = generate_sorted_array(5) + self.assertEqual(arr, [0, 1, 2, 3, 4]) + self.assertEqual(len(arr), 5) + + def test_generate_reverse_sorted_array(self): + """Test generating reverse-sorted array.""" + arr = generate_reverse_sorted_array(5) + self.assertEqual(arr, [5, 4, 3, 2, 1]) + self.assertEqual(len(arr), 5) + + def test_generate_random_array(self): + """Test generating random array.""" + arr = generate_random_array(10, seed=42) + self.assertEqual(len(arr), 10) + # With same seed, should get same array + arr2 = generate_random_array(10, seed=42) + self.assertEqual(arr, arr2) + + def test_generate_random_array_different_seeds(self): + """Test that different seeds produce different arrays.""" + arr1 = generate_random_array(100, seed=1) + arr2 = generate_random_array(100, seed=2) + # Very unlikely to be the same + self.assertNotEqual(arr1, arr2) + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_heapsort.py b/tests/test_heapsort.py new file mode 100644 index 0000000..bbffcc7 --- /dev/null +++ b/tests/test_heapsort.py @@ -0,0 +1,156 @@ +""" +Test Suite for Heapsort Implementation + +This module contains comprehensive tests for the heapsort algorithm, +including edge cases, different data types, and correctness verification. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import unittest +import sys +import os + +# Add parent directory to path to import src modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.heapsort import heapsort, heap_extract_max, heap_insert, _build_max_heap, _heapify + + +class TestHeapsort(unittest.TestCase): + """Test cases for heapsort function.""" + + def test_empty_array(self): + """Test sorting an empty array.""" + arr = [] + result = heapsort(arr) + self.assertEqual(result, []) + + def test_single_element(self): + """Test sorting an array with a single element.""" + arr = [42] + result = heapsort(arr) + self.assertEqual(result, [42]) + + def test_already_sorted(self): + """Test sorting an already sorted array.""" + arr = [1, 2, 3, 4, 5] + result = heapsort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_reverse_sorted(self): + """Test sorting a reverse-sorted array.""" + arr = [5, 4, 3, 2, 1] + result = heapsort(arr) + self.assertEqual(result, [1, 2, 3, 4, 5]) + + def test_random_array(self): + """Test sorting a random array.""" + arr = [12, 11, 13, 5, 6, 7] + result = heapsort(arr) + self.assertEqual(result, [5, 6, 7, 11, 12, 13]) + + def test_duplicate_elements(self): + """Test sorting an array with duplicate elements.""" + arr = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5] + result = heapsort(arr) + self.assertEqual(result, [1, 1, 2, 3, 3, 4, 5, 5, 5, 6, 9]) + + def test_negative_numbers(self): + """Test sorting an array with negative numbers.""" + arr = [-5, -2, -8, 1, 3, -1] + result = heapsort(arr) + self.assertEqual(result, [-8, -5, -2, -1, 1, 3]) + + def test_large_array(self): + """Test sorting a large array.""" + arr = list(range(1000, 0, -1)) + result = heapsort(arr) + self.assertEqual(result, list(range(1, 1001))) + + def test_inplace_sorting(self): + """Test that inplace sorting modifies the original array.""" + arr = [3, 1, 4, 1, 5] + original_id = id(arr) + result = heapsort(arr, inplace=True) + self.assertEqual(id(result), original_id) + self.assertEqual(arr, [1, 1, 3, 4, 5]) + + def test_not_inplace_sorting(self): + """Test that non-inplace sorting doesn't modify the original array.""" + arr = [3, 1, 4, 1, 5] + original_arr = arr.copy() + result = heapsort(arr, inplace=False) + self.assertNotEqual(id(result), id(arr)) + self.assertEqual(arr, original_arr) + self.assertEqual(result, [1, 1, 3, 4, 5]) + + def test_custom_key_function(self): + """Test sorting with a custom key function.""" + arr = [{'value': 3}, {'value': 1}, {'value': 4}] + result = heapsort(arr, key=lambda x: x['value'], inplace=False) + self.assertEqual([x['value'] for x in result], [1, 3, 4]) + + +class TestHeapOperations(unittest.TestCase): + """Test cases for heap utility functions.""" + + def test_heapify(self): + """Test the heapify function.""" + arr = [4, 10, 3, 5, 1] + _heapify(arr, 5, 0) + # After heapify, root should be the maximum + self.assertEqual(arr[0], 10) + + def test_build_max_heap(self): + """Test building a max-heap from an array.""" + arr = [4, 10, 3, 5, 1] + _build_max_heap(arr) + # Root should be maximum + self.assertEqual(arr[0], 10) + # Verify heap property: parent >= children + for i in range(len(arr)): + left = 2 * i + 1 + right = 2 * i + 2 + if left < len(arr): + self.assertGreaterEqual(arr[i], arr[left]) + if right < len(arr): + self.assertGreaterEqual(arr[i], arr[right]) + + def test_heap_extract_max(self): + """Test extracting maximum from a heap.""" + heap = [10, 5, 3, 4, 1] + _build_max_heap(heap) + max_val = heap_extract_max(heap) + self.assertEqual(max_val, 10) + self.assertEqual(len(heap), 4) + # Verify heap property is maintained + self.assertEqual(heap[0], 5) + + def test_heap_extract_max_empty(self): + """Test extracting from an empty heap raises error.""" + heap = [] + with self.assertRaises(IndexError): + heap_extract_max(heap) + + def test_heap_insert(self): + """Test inserting into a heap.""" + heap = [10, 5, 3, 4, 1] + _build_max_heap(heap) + heap_insert(heap, 15) + # New maximum should be at root + self.assertEqual(heap[0], 15) + # Verify heap property + for i in range(len(heap)): + left = 2 * i + 1 + right = 2 * i + 2 + if left < len(heap): + self.assertGreaterEqual(heap[i], heap[left]) + if right < len(heap): + self.assertGreaterEqual(heap[i], heap[right]) + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_priority_queue.py b/tests/test_priority_queue.py new file mode 100644 index 0000000..2e1d1a6 --- /dev/null +++ b/tests/test_priority_queue.py @@ -0,0 +1,223 @@ +""" +Test Suite for Priority Queue Implementation + +This module contains comprehensive tests for the Priority Queue data structure, +including all core operations and edge cases. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import unittest +import sys +import os + +# Add parent directory to path to import src modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.priority_queue import PriorityQueue +from src.task import Task + + +class TestPriorityQueue(unittest.TestCase): + """Test cases for Priority Queue implementation.""" + + def setUp(self): + """Set up test fixtures.""" + self.pq = PriorityQueue(is_max_heap=True) + + def test_initialization(self): + """Test priority queue initialization.""" + pq = PriorityQueue() + self.assertTrue(pq.is_empty()) + self.assertEqual(pq.size(), 0) + self.assertTrue(pq.is_max_heap) + + def test_initialization_min_heap(self): + """Test min-heap initialization.""" + pq = PriorityQueue(is_max_heap=False) + self.assertTrue(pq.is_empty()) + self.assertFalse(pq.is_max_heap) + + def test_insert_single_task(self): + """Test inserting a single task.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.pq.insert(task) + self.assertFalse(self.pq.is_empty()) + self.assertEqual(self.pq.size(), 1) + + def test_insert_multiple_tasks(self): + """Test inserting multiple tasks.""" + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=5, arrival_time=1.0), + Task("T3", priority=15, arrival_time=2.0) + ] + for task in tasks: + self.pq.insert(task) + self.assertEqual(self.pq.size(), 3) + + def test_extract_max_ordering(self): + """Test that extract_max returns tasks in priority order.""" + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=5, arrival_time=1.0), + Task("T3", priority=15, arrival_time=2.0), + Task("T4", priority=20, arrival_time=3.0) + ] + for task in tasks: + self.pq.insert(task) + + # Should extract in descending priority order + self.assertEqual(self.pq.extract_max().priority, 20) + self.assertEqual(self.pq.extract_max().priority, 15) + self.assertEqual(self.pq.extract_max().priority, 10) + self.assertEqual(self.pq.extract_max().priority, 5) + self.assertTrue(self.pq.is_empty()) + + def test_extract_max_empty(self): + """Test extracting from empty queue raises error.""" + with self.assertRaises(IndexError): + self.pq.extract_max() + + def test_extract_min_ordering(self): + """Test that extract_min returns tasks in ascending priority order.""" + pq = PriorityQueue(is_max_heap=False) + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=5, arrival_time=1.0), + Task("T3", priority=15, arrival_time=2.0), + Task("T4", priority=20, arrival_time=3.0) + ] + for task in tasks: + pq.insert(task) + + # Should extract in ascending priority order + self.assertEqual(pq.extract_min().priority, 5) + self.assertEqual(pq.extract_min().priority, 10) + self.assertEqual(pq.extract_min().priority, 15) + self.assertEqual(pq.extract_min().priority, 20) + self.assertTrue(pq.is_empty()) + + def test_peek(self): + """Test peeking at the highest priority task.""" + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=5, arrival_time=1.0), + Task("T3", priority=15, arrival_time=2.0) + ] + for task in tasks: + self.pq.insert(task) + + peeked = self.pq.peek() + self.assertEqual(peeked.priority, 15) + # Peek should not remove the element + self.assertEqual(self.pq.size(), 3) + + def test_peek_empty(self): + """Test peeking at empty queue returns None.""" + self.assertIsNone(self.pq.peek()) + + def test_increase_key(self): + """Test increasing the priority of a task.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.pq.insert(task) + self.pq.insert(Task("T2", priority=20, arrival_time=1.0)) + + # Initially, T2 should be at root + self.assertEqual(self.pq.peek().priority, 20) + + # Increase T1's priority + success = self.pq.increase_key(task, 25) + self.assertTrue(success) + self.assertEqual(task.priority, 25) + + # Now T1 should be at root + self.assertEqual(self.pq.peek().priority, 25) + self.assertEqual(self.pq.peek().task_id, "T1") + + def test_increase_key_not_found(self): + """Test increasing key of non-existent task.""" + task = Task("T1", priority=10, arrival_time=0.0) + success = self.pq.increase_key(task, 20) + self.assertFalse(success) + + def test_decrease_key(self): + """Test decreasing the priority of a task.""" + task = Task("T1", priority=20, arrival_time=0.0) + self.pq.insert(task) + self.pq.insert(Task("T2", priority=10, arrival_time=1.0)) + + # Initially, T1 should be at root + self.assertEqual(self.pq.peek().priority, 20) + + # Decrease T1's priority + success = self.pq.decrease_key(task, 5) + self.assertTrue(success) + self.assertEqual(task.priority, 5) + + # Now T2 should be at root + self.assertEqual(self.pq.peek().priority, 10) + self.assertEqual(self.pq.peek().task_id, "T2") + + def test_decrease_key_not_found(self): + """Test decreasing key of non-existent task.""" + task = Task("T1", priority=10, arrival_time=0.0) + success = self.pq.decrease_key(task, 5) + self.assertFalse(success) + + def test_is_empty(self): + """Test is_empty method.""" + self.assertTrue(self.pq.is_empty()) + self.pq.insert(Task("T1", priority=10, arrival_time=0.0)) + self.assertFalse(self.pq.is_empty()) + self.pq.extract_max() + self.assertTrue(self.pq.is_empty()) + + def test_size(self): + """Test size method.""" + self.assertEqual(self.pq.size(), 0) + for i in range(5): + self.pq.insert(Task(f"T{i}", priority=i, arrival_time=float(i))) + self.assertEqual(self.pq.size(), i + 1) + + for i in range(5): + self.pq.extract_max() + self.assertEqual(self.pq.size(), 4 - i) + + def test_large_queue(self): + """Test priority queue with many elements.""" + for i in range(1000): + self.pq.insert(Task(f"T{i}", priority=i, arrival_time=float(i))) + + self.assertEqual(self.pq.size(), 1000) + + # Extract all and verify ordering + prev_priority = float('inf') + while not self.pq.is_empty(): + task = self.pq.extract_max() + self.assertLessEqual(task.priority, prev_priority) + prev_priority = task.priority + + def test_duplicate_priorities(self): + """Test handling of tasks with duplicate priorities.""" + tasks = [ + Task("T1", priority=10, arrival_time=0.0), + Task("T2", priority=10, arrival_time=1.0), + Task("T3", priority=10, arrival_time=2.0) + ] + for task in tasks: + self.pq.insert(task) + + # All should be extractable + extracted = [] + while not self.pq.is_empty(): + extracted.append(self.pq.extract_max()) + + self.assertEqual(len(extracted), 3) + self.assertTrue(all(task.priority == 10 for task in extracted)) + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..f5f1a38 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,190 @@ +""" +Test Suite for Task Scheduler Implementation + +This module contains comprehensive tests for the task scheduler, +including scheduling algorithms, statistics, and edge cases. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import unittest +import sys +import os + +# Add parent directory to path to import src modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.scheduler import TaskScheduler, SchedulingResult, SchedulingStatistics, simulate_scheduler +from src.task import Task + + +class TestTaskScheduler(unittest.TestCase): + """Test cases for TaskScheduler class.""" + + def setUp(self): + """Set up test fixtures.""" + self.scheduler = TaskScheduler() + + def test_basic_scheduling(self): + """Test basic priority-based scheduling.""" + tasks = [ + Task("T1", priority=10, arrival_time=0.0, execution_time=5.0), + Task("T2", priority=20, arrival_time=0.0, execution_time=3.0), + Task("T3", priority=15, arrival_time=0.0, execution_time=4.0), + ] + + results = self.scheduler.schedule_tasks(tasks) + + self.assertEqual(len(results), 3) + # Highest priority should execute first + self.assertEqual(results[0].task_id, "T2") + self.assertEqual(results[1].task_id, "T3") + self.assertEqual(results[2].task_id, "T1") + + def test_scheduling_order(self): + """Test that tasks are scheduled in priority order.""" + tasks = [ + Task("T1", priority=5, arrival_time=0.0, execution_time=1.0), + Task("T2", priority=10, arrival_time=0.0, execution_time=1.0), + Task("T3", priority=15, arrival_time=0.0, execution_time=1.0), + ] + + results = self.scheduler.schedule_tasks(tasks) + + # Should be in descending priority order + priorities = [t.priority for t in tasks] + priorities.sort(reverse=True) + + for i, result in enumerate(results): + # Find the task that matches this result + task = next(t for t in tasks if t.task_id == result.task_id) + self.assertEqual(task.priority, priorities[i]) + + def test_deadline_tracking(self): + """Test that deadlines are correctly tracked.""" + tasks = [ + Task("T1", priority=20, arrival_time=0.0, deadline=10.0, execution_time=5.0), + Task("T2", priority=10, arrival_time=0.0, deadline=100.0, execution_time=20.0), + ] + + results = self.scheduler.schedule_tasks(tasks) + + # T1 should meet deadline (starts at 0, completes at 5, deadline 10) + self.assertTrue(results[0].deadline_met) + + # T2 should also meet deadline (starts at 5, completes at 25, deadline 100) + self.assertTrue(results[1].deadline_met) + + def test_deadline_missed(self): + """Test detection of missed deadlines.""" + tasks = [ + Task("T1", priority=20, arrival_time=0.0, deadline=3.0, execution_time=5.0), + ] + + results = self.scheduler.schedule_tasks(tasks) + + # Task should miss deadline (completes at 5, deadline is 3) + self.assertFalse(results[0].deadline_met) + + def test_wait_time_calculation(self): + """Test wait time calculation.""" + tasks = [ + Task("T1", priority=20, arrival_time=0.0, execution_time=5.0), + Task("T2", priority=10, arrival_time=0.0, execution_time=3.0), + ] + + results = self.scheduler.schedule_tasks(tasks) + + # T1 should have no wait time (executes first) + self.assertEqual(results[0].wait_time, 0.0) + + # T2 should wait for T1 to complete + self.assertEqual(results[1].wait_time, 5.0) + + def test_empty_task_list(self): + """Test scheduling with empty task list.""" + results = self.scheduler.schedule_tasks([]) + self.assertEqual(len(results), 0) + + def test_single_task(self): + """Test scheduling a single task.""" + tasks = [Task("T1", priority=10, arrival_time=0.0, execution_time=5.0)] + results = self.scheduler.schedule_tasks(tasks) + + self.assertEqual(len(results), 1) + self.assertEqual(results[0].task_id, "T1") + self.assertEqual(results[0].start_time, 0.0) + self.assertEqual(results[0].completion_time, 5.0) + self.assertEqual(results[0].wait_time, 0.0) + + +class TestSchedulingStatistics(unittest.TestCase): + """Test cases for scheduling statistics.""" + + def test_statistics_calculation(self): + """Test statistics calculation.""" + scheduler = TaskScheduler() + + tasks = [ + Task("T1", priority=20, arrival_time=0.0, deadline=10.0, execution_time=5.0), + Task("T2", priority=10, arrival_time=0.0, deadline=100.0, execution_time=3.0), + ] + + results = scheduler.schedule_tasks(tasks) + stats = scheduler.get_statistics(results) + + self.assertEqual(stats.total_tasks, 2) + self.assertEqual(stats.completed_tasks, 2) + self.assertEqual(stats.deadline_met, 2) + self.assertEqual(stats.deadline_missed, 0) + self.assertEqual(stats.total_execution_time, 8.0) # 5 + 3 + self.assertGreater(stats.average_wait_time, 0) + self.assertGreater(stats.throughput, 0) + + def test_statistics_with_missed_deadlines(self): + """Test statistics with missed deadlines.""" + scheduler = TaskScheduler() + + tasks = [ + Task("T1", priority=20, arrival_time=0.0, deadline=3.0, execution_time=5.0), + Task("T2", priority=10, arrival_time=0.0, deadline=100.0, execution_time=3.0), + ] + + results = scheduler.schedule_tasks(tasks) + stats = scheduler.get_statistics(results) + + self.assertEqual(stats.deadline_met, 1) + self.assertEqual(stats.deadline_missed, 1) + + def test_statistics_empty_results(self): + """Test statistics with empty results.""" + scheduler = TaskScheduler() + stats = scheduler.get_statistics([]) + + self.assertEqual(stats.total_tasks, 0) + self.assertEqual(stats.completed_tasks, 0) + self.assertEqual(stats.total_execution_time, 0.0) + self.assertEqual(stats.average_wait_time, 0.0) + self.assertEqual(stats.throughput, 0.0) + + +class TestSimulateScheduler(unittest.TestCase): + """Test cases for simulate_scheduler convenience function.""" + + def test_simulate_scheduler(self): + """Test the simulate_scheduler function.""" + tasks = [ + Task("T1", priority=20, arrival_time=0.0, execution_time=5.0), + Task("T2", priority=10, arrival_time=0.0, execution_time=3.0), + ] + + stats = simulate_scheduler(tasks, verbose=False) + + self.assertEqual(stats.total_tasks, 2) + self.assertEqual(stats.completed_tasks, 2) + + +if __name__ == '__main__': + unittest.main() + diff --git a/tests/test_task.py b/tests/test_task.py new file mode 100644 index 0000000..ddacc3b --- /dev/null +++ b/tests/test_task.py @@ -0,0 +1,125 @@ +""" +Test Suite for Task Class + +This module contains tests for the Task class, including comparisons, +priority updates, and deadline checking. + +Author: Carlos Gutierrez +Email: cgutierrez44833@ucumberlands.edu +""" + +import unittest +import sys +import os + +# Add parent directory to path to import src modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from src.task import Task + + +class TestTask(unittest.TestCase): + """Test cases for Task class.""" + + def test_task_creation(self): + """Test creating a task with all parameters.""" + task = Task( + task_id="T1", + priority=10, + arrival_time=0.0, + deadline=100.0, + execution_time=5.0, + description="Test task" + ) + self.assertEqual(task.task_id, "T1") + self.assertEqual(task.priority, 10) + self.assertEqual(task.arrival_time, 0.0) + self.assertEqual(task.deadline, 100.0) + self.assertEqual(task.execution_time, 5.0) + self.assertEqual(task.description, "Test task") + + def test_task_creation_minimal(self): + """Test creating a task with minimal parameters.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.assertEqual(task.task_id, "T1") + self.assertEqual(task.priority, 10) + self.assertEqual(task.arrival_time, 0.0) + self.assertIsNone(task.deadline) + self.assertEqual(task.execution_time, 1.0) + self.assertEqual(task.description, "") + + def test_task_comparison_lt(self): + """Test less than comparison.""" + task1 = Task("T1", priority=5, arrival_time=0.0) + task2 = Task("T2", priority=10, arrival_time=1.0) + self.assertTrue(task1 < task2) + self.assertFalse(task2 < task1) + + def test_task_comparison_gt(self): + """Test greater than comparison.""" + task1 = Task("T1", priority=10, arrival_time=0.0) + task2 = Task("T2", priority=5, arrival_time=1.0) + self.assertTrue(task1 > task2) + self.assertFalse(task2 > task1) + + def test_task_comparison_eq(self): + """Test equality comparison.""" + task1 = Task("T1", priority=10, arrival_time=0.0) + task2 = Task("T2", priority=10, arrival_time=1.0) + task3 = Task("T3", priority=5, arrival_time=2.0) + self.assertTrue(task1 == task2) + self.assertFalse(task1 == task3) + + def test_task_comparison_le(self): + """Test less than or equal comparison.""" + task1 = Task("T1", priority=5, arrival_time=0.0) + task2 = Task("T2", priority=10, arrival_time=1.0) + task3 = Task("T3", priority=5, arrival_time=2.0) + self.assertTrue(task1 <= task2) + self.assertTrue(task1 <= task3) + self.assertFalse(task2 <= task1) + + def test_task_comparison_ge(self): + """Test greater than or equal comparison.""" + task1 = Task("T1", priority=10, arrival_time=0.0) + task2 = Task("T2", priority=5, arrival_time=1.0) + task3 = Task("T3", priority=10, arrival_time=2.0) + self.assertTrue(task1 >= task2) + self.assertTrue(task1 >= task3) + self.assertFalse(task2 >= task1) + + def test_update_priority(self): + """Test updating task priority.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.assertEqual(task.priority, 10) + task.update_priority(20) + self.assertEqual(task.priority, 20) + + def test_is_overdue_with_deadline(self): + """Test checking if task is overdue.""" + task = Task("T1", priority=10, arrival_time=0.0, deadline=100.0) + self.assertFalse(task.is_overdue(50.0)) + self.assertFalse(task.is_overdue(100.0)) + self.assertTrue(task.is_overdue(150.0)) + + def test_is_overdue_no_deadline(self): + """Test checking overdue status for task without deadline.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.assertFalse(task.is_overdue(1000.0)) + + def test_time_until_deadline(self): + """Test calculating time until deadline.""" + task = Task("T1", priority=10, arrival_time=0.0, deadline=100.0) + self.assertEqual(task.time_until_deadline(50.0), 50.0) + self.assertEqual(task.time_until_deadline(100.0), 0.0) + self.assertEqual(task.time_until_deadline(150.0), 0.0) + + def test_time_until_deadline_no_deadline(self): + """Test time until deadline for task without deadline.""" + task = Task("T1", priority=10, arrival_time=0.0) + self.assertIsNone(task.time_until_deadline(100.0)) + + +if __name__ == '__main__': + unittest.main() +