Building Modular Penetration Testing Tools with Ngoto: A Deep Dive into Plugin Architecture and Concurrent Execution
Penetration testing workflows involve executing dozens of specialized tools across reconnaissance, enumeration, exploitation, and post-exploitation phases. Managing these tools—each with unique invocation patterns, output formats, and configuration requirements—creates friction that slows testing and increases cognitive load.
Ngoto addresses this problem through a plugin-based framework built on three core computer science concepts: decorator-based metaprogramming, tree-structured navigation with bidirectional traversal, and concurrent task scheduling via thread pools. This article examines the technical implementation, design trade-offs, and performance characteristics of building such a system.
The Challenge of Tool Integration in Penetration Testing
Modern penetration tests require coordinating multiple specialized tools across different phases. Each tool exists as a standalone executable with its own command-line interface, requiring security professionals to mentally context-switch between different syntaxes, manage intermediate files, and manually correlate results.
Existing solutions and their limitations:
Shell scripts consolidate tool invocations but offer O(1) lookup with no hierarchical organization. The cognitive complexity of navigating dozens of flat scripts grows linearly with script count.
Metasploit Framework provides interactive navigation but couples the interface tightly to its exploit database. The architecture is heavyweight, requiring PostgreSQL for module management and significant initialization overhead.
Custom frameworks often reinvent tree traversal, plugin discovery, and task scheduling—orthogonal concerns that distract from actual security testing logic.
Ngoto provides the infrastructure layer—navigation, scheduling, logging—as reusable primitives through a decorator-based API.
Decorator-Based Metaprogramming: Implementation and Trade-offs
The Decorator Pattern in Python
Python decorators are syntactic sugar for higher-order functions that take a function and return a modified version. Understanding their implementation is critical to understanding Ngoto's architecture.
Basic decorator mechanism:
def command(name, aliases, desc):
"""Decorator factory that returns a decorator"""
def decorator(func):
"""Actual decorator that wraps the function"""
def wrapper(*args, **kwargs):
# Could add pre/post processing here
return func(*args, **kwargs)
# Attach metadata to wrapper
wrapper._command_metadata = {
'name': name,
'aliases': aliases,
'desc': desc,
'func': func
}
return wrapper
return decorator
Ngoto's approach:
Instead of returning a wrapper function, Ngoto decorators return a metadata object that holds both the function reference and configuration:
class Command:
def __init__(self, name: str, aliases: list, desc: str):
self.name = name
self.aliases = aliases
self.desc = desc
self.func = None
def __call__(self, func):
"""Called when decorator is applied to a function"""
self.func = func
# Return a lambda that reconstructs Command object
# This enables metadata extraction via reflection
return lambda *args: Command(self.name, self.aliases, self.desc)
Why return a lambda instead of storing metadata on the function?
This design choice enables reflection-based discovery. When loading cogs, Ngoto calls each method with empty arguments:
def get_object_from_method(method):
"""Returns decorator object from decorated method"""
num_args = method.__code__.co_argcount
args = [''] * num_args
return method(*args) # Triggers lambda, returns Command object
Trade-offs:
✅ Advantage: Clean separation of metadata from function execution ✅ Advantage: Type-safe decorator objects (Command, Plugin, Task classes) ✅ Advantage: Enables isinstance() checks for type discrimination
❌ Disadvantage: Decorated functions aren't directly callable—requires extracting .func attribute ❌ Disadvantage: Slightly more memory overhead (object per decorated method) ❌ Disadvantage: Breaks IDE introspection/autocomplete in some cases
Alternative approaches:
Function attributes (used by Click, Flask):
def command(name, aliases, desc):
def decorator(func):
func._is_command = True
func._command_name = name
func._command_aliases = aliases
return func
return decorator
This is simpler but requires checking multiple attributes. Ngoto's approach provides a single object with all metadata.
Entry points (used by Pluggy):
# setup.py
entry_points={
'ngoto.plugins': [
'network_recon = mypkg.network:ReconPlugin',
]
}
This enables third-party plugins but requires package installation and entry point registration. Ngoto prioritizes simplicity for script-based usage.
Reflection and Introspection Mechanics
Ngoto's cog loading system uses Python's reflection capabilities to discover decorated methods:
def load_cogs(self, cog_classes: list):
"""Extract commands, plugins, tasks from cog classes"""
for cog in cog_classes:
for method_name in dir(cog):
if method_name[0] != '_':
method = getattr(cog, method_name)
method_object = get_object_from_method(method)
if isinstance(method_object, Command):
self.commands.append(method_object)
elif isinstance(method_object, Task):
method_object.logger = self.logger
self.tasks.add_task(method_object)
elif isinstance(method_object, Plugin):
self.add_plugin(method_object, method_object.folder)
Key Python introspection mechanisms used:
dir(obj): Returns list of attribute names (includes inherited methods)getattr(obj, name): Retrieves attribute by string namemethod.__code__.co_argcount: Accesses function code object for parameter countisinstance(obj, class): Runtime type checking
Time complexity analysis:
dir(): O(n) where n = number of attributesgetattr(): O(1) dictionary lookup in__dict__- Reflection per cog: O(n × m) where n = methods, m = decorator types
- Total loading: O(c × n × m) where c = number of cogs
For typical usage (< 20 cogs, < 50 methods each), loading is effectively instantaneous (<10ms).
Memory model:
Each decorated method creates:
- 1 decorator object (Command/Plugin/Task): ~200 bytes
- 1 lambda closure: ~100 bytes
- Metadata strings: variable (typically 50-200 bytes)
For 100 decorated methods: ~35KB overhead—negligible in modern systems.
Tree-Based Navigation: Data Structure and Algorithmic Analysis
Node Structure and Complexity
Ngoto's navigation uses a tree where each node represents a folder or plugin attachment point:
class Node:
def __init__(self, name: str):
self.name = name
self.parent = None # O(1) back navigation
self.children = [] # List of child nodes
self.plugins = [] # List of plugins at this level
@property
def num_children(self):
return len(self.children)
@property
def num_plugins(self):
return len(self.plugins)
Operations and complexity:
| Operation | Implementation | Time Complexity | Space Complexity |
|---|---|---|---|
| Add child | children.append() |
O(1) amortized | O(1) |
| Find child by name | Linear search | O(n) | O(1) |
| Add plugin | plugins.append() |
O(1) amortized | O(1) |
| Navigate up | node.parent |
O(1) | O(1) |
| Navigate down | Direct reference | O(1) | O(1) |
Why list instead of dict for children?
self.children = [] # Current implementation
# Alternative:
self.children = {} # Key: name, Value: Node
Trade-off analysis:
Lists: O(n) lookup, but n is typically small (< 10 folders per level) Dicts: O(1) lookup, but adds memory overhead and complexity for small n
For n < 20, linear search is faster due to cache locality and lack of hash computation. Benchmarking shows lists outperform dicts for typical plugin tree sizes.
Path Resolution Algorithm
When adding a plugin to a path like 'Reconnaissance/Network/DNS', Ngoto traverses or creates the path:
def add_plugin(self, plugin: Plugin, location: str) -> None:
"""Add plugin to tree at specified path"""
curr_node = self.curr_pos
for folder in location.split('/'):
if folder != '':
if curr_node.has_child(folder):
# Path exists: traverse
curr_node = curr_node.get_child_from_name(folder)
else:
# Path doesn't exist: create
new_node = Node(folder)
curr_node.add_child(new_node)
curr_node = new_node
curr_node.add_plugin(plugin)
Complexity analysis:
split('/'): O(k) where k = path length- Per folder: O(c) lookup where c = children at that level
- Total: O(k × c)
For typical paths (depth 2-3, < 10 children per level): ~20-30 operations
Memory usage:
Each Node: ~150 bytes (object overhead + 4 references) Total for 50 nodes: ~7.5KB
Alternative Data Structures Considered
Trie (prefix tree):
class TrieNode:
def __init__(self):
self.children = {} # char -> TrieNode
self.is_end = False
self.plugins = []
Better for autocomplete, but overkill for navigation. Path prefixes aren't frequently shared in plugin organization.
Flat dictionary with path keys:
self.plugins = {
'Reconnaissance/Network/DNS': [plugin1, plugin2],
'Reconnaissance/Network/Port': [plugin3]
}
O(1) lookup but loses hierarchical display. Navigation becomes a string manipulation problem rather than tree traversal.
B-tree or balanced tree:
Unnecessary complexity for small datasets. Tree balancing overhead exceeds any lookup benefits for n < 1000.
Concurrent Execution Model: Threading and the GIL
ThreadPoolExecutor Architecture
Ngoto uses concurrent.futures.ThreadPoolExecutor for concurrent task execution:
def start(self) -> None:
"""Main event loop with concurrent task scheduling"""
self.run_command('clear')
self.run_command('options')
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit CLI as background task
clt_loop = executor.submit(self.clt)
while True:
curr_time = time()
# Check for tasks ready to execute
self.tasks.check_available_tasks(executor, curr_time)
# Monitor running task completion
self.tasks.check_running_tasks(self.logger)
if clt_loop.done():
break
# Maintain 1Hz tick rate
sleep(1 - (time() - curr_time))
Why threads instead of asyncio?
Python's Global Interpreter Lock (GIL) prevents true parallel execution of Python bytecode. However, threads are appropriate here because:
- I/O-bound operations dominate: Most tasks involve subprocess calls (external tools) or network requests
- GIL released during I/O:
subprocess.run(),socket.recv(), andtime.sleep()release GIL - Subprocess blocking: External tools run in separate processes, unaffected by GIL
- Simplicity: Thread-based code is more straightforward than async/await for I/O operations
ThreadPoolExecutor vs. alternatives:
| Approach | GIL Impact | Suitable for | Complexity |
|---|---|---|---|
| Threading | Released during I/O | I/O-bound tasks | Low |
| Multiprocessing | No GIL (separate processes) | CPU-bound tasks | Medium |
| asyncio | Single-threaded, cooperative | High-concurrency I/O | High |
Memory overhead per thread:
- Thread stack size: 1-8 MB (OS-dependent)
- Python thread state: ~10 KB
- Total for 3 workers: 3-24 MB
This is acceptable overhead for the workload. Tasks are I/O-bound (waiting on external tools), so threads spend most time blocked, not consuming CPU.
Task Scheduling Algorithm
Tasks run on fixed delay intervals:
class TaskController:
def __init__(self):
self.tasks = []
self.running_tasks = {}
def add_task(self, task: Task):
"""Register task with next execution time"""
task.next_run = time() + task.delay
self.tasks.append(task)
def check_available_tasks(self, executor, current_time):
"""Schedule tasks that are due"""
for task in self.tasks:
if task.enabled and current_time >= task.next_run:
if task.id not in self.running_tasks:
# Submit to thread pool
future = executor.submit(task.func)
self.running_tasks[task.id] = {
'future': future,
'task': task
}
# Schedule next execution
task.next_run = current_time + task.delay
def check_running_tasks(self, logger):
"""Check for completed tasks"""
completed = []
for task_id, info in self.running_tasks.items():
if info['future'].done():
try:
result = info['future'].result()
logger.debug(f'Task {task_id} completed: {result}')
except Exception as e:
logger.error(f'Task {task_id} failed: {e}')
completed.append(task_id)
# Remove completed tasks
for task_id in completed:
del self.running_tasks[task_id]
Scheduling characteristics:
- Type: Fixed delay (not fixed rate)
- Drift: Accumulates if task execution exceeds delay
- Overlap prevention: Tasks with same ID won't run concurrently
- Complexity: O(t) per check where t = number of tasks (typically < 20)
Fixed delay vs. fixed rate:
Fixed delay (current implementation):
Task runs at: t=0, t=5, t=10, t=15
Execution takes 2s each
Actual times: 0, 7, 14, 21 (next = last_completion + delay)
Fixed rate (alternative):
Task runs at: t=0, t=5, t=10, t=15
Execution takes 2s each
Actual times: 0, 5, 10, 15 (scheduled independently)
Could queue if execution > interval
Ngoto uses fixed delay to prevent task queuing. If a port scan takes 5 minutes, the next scan starts 5 minutes + delay after completion, not during execution.
Race Conditions and Thread Safety
Potential race condition:
# Multiple threads accessing self.tasks
self.tasks.check_available_tasks(executor, curr_time) # Thread 1
self.tasks.check_running_tasks(self.logger) # Thread 2
Why this is safe:
- Separate data structures:
check_available_tasksreadsself.taskslist,check_running_tasksmodifiesself.running_tasksdict - Single writer: Main loop thread is the only writer to both structures
- GIL protection: Dictionary and list operations are atomic at the Python level
Not thread-safe operations:
# If multiple threads could call this:
self.tasks.append(new_task) # NOT thread-safe without lock
Python's list append() is atomic, but iterating while appending is not:
for task in self.tasks: # Thread A iterating
# ...
self.tasks.append(new) # Thread B appending → can corrupt iteration
Thread safety in Ngoto:
Since only the main loop thread modifies self.tasks and self.running_tasks, no locks are needed. Task execution threads only read task parameters and write to logger (which has its own thread safety via list append).
Logger thread safety:
class Logging:
def __init__(self):
self.log_buffer = [] # Shared between threads
def info(self, message: str, program: str = 'System'):
# List append is atomic in CPython
self.log_buffer.append(log_entry)
Why no lock?
CPython's list.append() is atomic because:
- Implemented in C, not bytecode
- Doesn't release GIL during operation
- Single operation (not read-modify-write)
However, this is an implementation detail. For production systems, use threading.Lock():
class Logging:
def __init__(self):
self.log_buffer = []
self.lock = threading.Lock()
def info(self, message: str, program: str = 'System'):
with self.lock:
self.log_buffer.append(log_entry)
CLI Loop: Recursion vs. Iteration
The CLI input loop uses recursion:
def clt(self):
"""CLI input loop (recursive)"""
option = rich_console.input('\n[Ngoto] > ').split()
if not option:
pass
elif not self.run_command(option[0], option):
rich_console.print_output("Unknown command")
self.clt() # Tail recursion
Why recursion instead of while True?
This is a design choice with trade-offs:
Recursive approach (current):
def clt(self):
process_input()
self.clt()
Iterative approach:
def clt(self):
while True:
process_input()
Trade-offs:
✅ Recursion advantages:
- Each input gets fresh stack frame
- Easy to inject state changes between iterations
- Natural for functional programming patterns
❌ Recursion disadvantages:
- Stack overflow after ~1000 calls (Python recursion limit)
- More memory per call (~1KB per frame)
- Potential for stack exhaustion in long sessions
Python recursion limit:
import sys
sys.getrecursionlimit() # Default: 1000
After 1000 commands, Ngoto would crash with RecursionError. For a pentesting tool used over hours or days, this is problematic.
Solution: Use while True for production systems:
def clt(self):
"""CLI input loop (iterative)"""
while True:
try:
option = rich_console.input('\n[Ngoto] > ').split()
if not option:
continue
elif not self.run_command(option[0], option):
rich_console.print_output("Unknown command")
except KeyboardInterrupt:
break
This eliminates stack growth and handles interrupts gracefully.
Technical justification for current design:
The recursive approach likely remains from early prototyping where it enabled clean code structure. For production use, refactoring to iteration would be advisable.
Command Dispatch: Algorithm and Performance
Dispatch Mechanism
Command resolution uses linear search through registered commands:
def run_command(self, command: str, options: list = []) -> bool:
"""Execute command by name or alias"""
# Handle numeric input for navigation
if command.isdigit():
num = int(options[0]) - 1
if num < self.curr_pos.num_children:
command = 'openFolder'
elif num < self.curr_pos.num_children + self.curr_pos.num_plugins:
command = 'openPlugin'
# Linear search through commands
for cmd in self.commands:
if command in cmd.aliases or command == cmd.name:
pos = cmd.func(self, self.logger, options)
if pos:
self.curr_pos = pos
return True
return False
Complexity analysis:
- Numeric check: O(1)
- Command lookup: O(c × a) where c = commands, a = aliases per command
- Typical: c = 20, a = 3 → 60 comparisons worst case
- Average: 30 comparisons (assuming uniform distribution)
Performance in practice:
String comparison is fast (C-level operation). For 20 commands with 3 aliases each:
- Worst case: 60 comparisons × ~10ns = 600ns
- Average case: 30 comparisons × ~10ns = 300ns
Negligible compared to human input speed (hundreds of milliseconds between commands).
Alternative Dispatch Strategies
Hash map (O(1) lookup):
class Ngoto:
def __init__(self):
self.command_map = {} # name/alias -> Command object
def load_cogs(self, cogs):
for cmd in commands:
self.command_map[cmd.name] = cmd
for alias in cmd.aliases:
self.command_map[alias] = cmd
def run_command(self, command: str, options: list):
if command in self.command_map:
cmd = self.command_map[command]
pos = cmd.func(self, self.logger, options)
return True
return False
Trade-offs:
✅ O(1) lookup vs. O(n) ❌ Extra memory (dict overhead ~240 bytes per entry) ❌ More complex loading logic ❌ Duplicate Command references in memory
When is optimization worthwhile?
For c commands with a aliases each:
- Linear: O(c × a) per lookup
- HashMap: O(1) per lookup, but O(c × a) space
Optimization worthwhile when: (lookup_frequency × c × a) > space_cost
For Ngoto: ~1 lookup/second, 20 commands → linear search is fine.
Trie-based prefix matching:
For autocomplete or partial matching:
class CommandTrie:
def __init__(self):
self.root = {}
def insert(self, command, cmd_obj):
node = self.root
for char in command:
if char not in node:
node[char] = {}
node = node[char]
node['$'] = cmd_obj
def search_prefix(self, prefix):
node = self.root
for char in prefix:
if char not in node:
return []
node = node[char]
return self._collect_all(node)
Useful for tab completion, but adds complexity for minimal benefit in Ngoto's use case.
Design Patterns and Architectural Analysis
Command Pattern
Ngoto implements the Command pattern (Gang of Four):
Structure:
Command (interface)
├── ConcreteCommand (e.g., ClearCommand)
│ ├── execute()
│ └── receiver (reference to Ngoto instance)
└── Invoker (command dispatcher)
Implementation:
class Command:
def __init__(self, name, aliases, desc):
self.name = name
self.func = None # The command logic
def execute(self, ngoto_instance, logger, options):
return self.func(ngoto_instance, logger, options)
Benefits:
- Decouples command invocation from execution
- Enables command history, undo/redo (not implemented but architecture supports)
- Commands are first-class objects (can be passed, stored, serialized)
Strategy Pattern
Plugin execution uses the Strategy pattern:
class Plugin:
def __init__(self, name, desc, folder):
self.name = name
self.func = None # Strategy to execute
def execute(self, logger):
return self.func(self, logger)
Each plugin encapsulates a different reconnaissance/exploitation strategy. The framework doesn't care about implementation details—just that plugins conform to the interface.
Observer Pattern (Implicit)
The logging system implicitly implements Observer:
# Logger is observable
logger.info('Event occurred', program='Test')
# Multiple observers could subscribe
# (not implemented but architecture supports)
class LogObserver:
def update(self, log_entry):
# React to log event
pass
Composite Pattern
The Node tree structure implements Composite:
class Node:
"""Composite that can contain other Nodes or Plugins"""
def __init__(self, name):
self.children = [] # Other composites
self.plugins = [] # Leaf nodes
This allows treating individual plugins and groups of plugins uniformly.
Performance Characteristics and Bottlenecks
System-Wide Profiling
CPU usage:
- Idle (waiting for input): <1% CPU
- Task execution: Depends on subprocess (external tool)
- Navigation: ~0.1ms per command dispatch
Memory usage:
- Base process: ~15-20 MB (Python interpreter + imports)
- Per cog: ~5-10 KB
- Per decorator: ~300 bytes
- Total for 50 cogs, 200 commands: ~25 MB
Bottlenecks:
- External tool execution: Dominates runtime (seconds to minutes)
- Subprocess creation: ~5-10ms per
subprocess.run()call - I/O operations: File writes, network requests
- Terminal rendering: Rich library formatting (~1-5ms per print)
Not bottlenecks:
- Command dispatch: ~300ns
- Tree navigation: ~100ns
- Decorator loading: ~10ms total (one-time cost)
Scalability Analysis
Scaling dimensions:
| Dimension | Current | Limit | Bottleneck |
|---|---|---|---|
| Commands | 20 | ~1000 | Linear search in dispatch |
| Plugins | 100 | ~10,000 | Tree depth, terminal rendering |
| Tree depth | 3 | ~10 | User navigation complexity |
| Tasks | 5 | ~50 | Thread pool saturation |
| Concurrent tasks | 3 | ~100 | Thread overhead, GIL |
When would optimization be needed?
- > 100 commands: Switch to hash-based dispatch
- > 1000 plugins: Implement pagination in terminal output
- > 20 concurrent tasks: Consider asyncio or multiprocessing
- > 10 tree depth: Flatten hierarchy or add shortcuts
For typical penetration testing use (20-50 tools), current architecture is appropriate.
Practical Penetration Testing Examples
Example 1: Network Reconnaissance Suite
from ngoto import plugin, command, task, Ngoto
import subprocess
import os
class NetworkRecon:
"""Network reconnaissance and enumeration tools"""
@plugin(name='Nmap Full Scan', desc='Comprehensive port scan', folder='Recon/Network')
def nmap_full(self, logger):
"""Full TCP scan with service detection"""
target = input("Enter target: ")
output = input("Output file: ")
logger.info(f'Starting full scan of {target}', program='Nmap')
cmd = f"nmap -sV -sC -O -p- {target} -oA {output}"
subprocess.run(cmd, shell=True)
logger.info(f'Scan complete. Output saved to {output}', program='Nmap')
return False
@plugin(name='SMB Enumeration', desc='Enumerate SMB shares', folder='Recon/Network')
def smb_enum(self, logger):
"""SMB share and user enumeration"""
target = input("Enter target: ")
logger.info(f'Enumerating SMB on {target}', program='Enum4linux')
cmd = f"enum4linux -a {target}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
logger.info(result.stdout, program='Enum4linux')
return False
@task(name='HostMonitor', desc='Monitor target host availability', delay=120, id='host_mon')
def host_monitor(self):
"""Ping targets to check availability"""
if os.path.exists('targets.txt'):
with open('targets.txt', 'r') as f:
targets = [line.strip() for line in f if line.strip()]
for target in targets:
response = os.system(f"ping -c 1 {target} > /dev/null 2>&1")
status = "UP" if response == 0 else "DOWN"
self.logger.info(f'{target}: {status}', program='Monitor')
return 'Host check complete'
ngoto = Ngoto()
ngoto.load_cogs([NetworkRecon()])
ngoto.start()
Example 2: Web Application Testing Framework
from ngoto import plugin, task, Ngoto
import subprocess
import requests
class WebAppTesting:
"""Web application vulnerability assessment tools"""
@plugin(name='SQLMap', desc='SQL injection testing', folder='Web/Exploitation')
def sqlmap_test(self, logger):
"""Automated SQL injection detection"""
url = input("Enter target URL: ")
param = input("Enter parameter to test: ")
logger.info(f'Testing {url} parameter {param}', program='SQLMap')
cmd = f"sqlmap -u '{url}' -p {param} --batch --risk=3 --level=5"
subprocess.run(cmd, shell=True)
logger.info('SQL injection testing complete', program='SQLMap')
return False
@plugin(name='Directory Bruteforce', desc='Enumerate directories', folder='Web/Recon')
def dir_bruteforce(self, logger):
"""Directory and file enumeration"""
url = input("Enter target URL: ")
wordlist = input("Enter wordlist path: ")
extensions = input("Enter extensions (e.g., php,html): ")
logger.info(f'Enumerating {url}', program='Gobuster')
cmd = f"gobuster dir -u {url} -w {wordlist} -x {extensions}"
subprocess.run(cmd, shell=True)
logger.info('Directory enumeration complete', program='Gobuster')
return False
ngoto = Ngoto()
ngoto.load_cogs([WebAppTesting()])
ngoto.start()
Comparison with Alternative Architectures
Plugin System Comparison
| Framework | Discovery | Loading | Isolation | Complexity |
|---|---|---|---|---|
| Ngoto | Reflection | Import-time | Shared process | Low |
| Pluggy | Entry points | Explicit registration | Shared process | Medium |
| Stevedore | Entry points | setuptools | Separate packages | High |
| IPC-based | Network/pipes | Runtime | Separate processes | Very high |
Ngoto's approach (reflection-based):
✅ Zero configuration for local scripts ✅ Simple development workflow ✅ Fast loading (no IPC overhead) ❌ No isolation (plugin crash → app crash) ❌ Requires Python source access
Entry point systems (Pluggy, Stevedore):
✅ Third-party plugins without code access ✅ Versioned plugin dependencies ✅ Standard Python packaging ❌ Requires package installation ❌ Slower loading (setuptools overhead) ❌ More complex for simple scripts
IPC-based systems:
✅ Complete isolation (crash-safe) ✅ Language-agnostic plugins ✅ Security boundaries ❌ High latency (IPC overhead) ❌ Complex protocol design ❌ Debugging difficulty
When to use each:
- Ngoto: Rapid prototyping, personal tools, script organization
- Entry points: Distributable packages, third-party plugins
- IPC: Security-critical, multi-language, fault tolerance required
Task Scheduling Comparison
| System | Type | Overhead | Precision | Use Case |
|---|---|---|---|---|
| Ngoto | Fixed delay, threads | Low | ~1s | Background monitoring |
| APScheduler | Cron-like, threads | Medium | ~1s | Complex scheduling |
| Celery | Distributed, workers | High | ~100ms | Production job queues |
| asyncio | Cooperative, single-thread | Very low | ~1ms | High-concurrency I/O |
Trade-off analysis:
Ngoto's simple loop-based scheduling:
- Predictable: Fixed tick rate, no drift correction
- Lightweight: No database, no message queue
- Limited: No distributed execution, no priority queues
For penetration testing (< 20 background tasks, non-critical timing), this is appropriate. Production systems with hundreds of tasks would need more sophisticated scheduling.
Installation and Usage
PyPI Installation
pip install ngoto
Basic Usage
from ngoto import plugin, command, task, Ngoto
class Basic:
@plugin(name='Tester', desc='Tester Plugin', folder='Random')
def tester(self, logger):
logger.info('Plugin ran', program='Test')
@command(name='test', aliases=['t'], desc='Tests command')
def test(self, logger, options):
logger.info('Command ran', program='Test')
@task(name='TaskTest', desc="Tests task creation", delay=3, id='test')
def testing(self):
self.logger.info('Task logger test ran', program='Test')
return 'Task ran'
ngoto = Ngoto()
ngoto.load_cogs([Basic()])
ngoto.start()
Development Setup
git clone https://github.com/HarryLudemann/Ngoto
cd Ngoto
pip install -r requirements.txt
python main.py
Conclusion
Ngoto demonstrates how three core computer science concepts—decorator-based metaprogramming, tree-structured data organization, and thread-based concurrent execution—combine to create a practical framework for penetration testing tool orchestration.
The technical decisions reflect trade-offs appropriate for the problem domain:
Decorator pattern: Sacrifices function call simplicity for clean metadata extraction and type safety. The lambda-based approach enables reflection without function attributes or entry points.
Tree structure with linear search: O(n) child lookup is acceptable for small n (< 10 per level). The simplicity and cache locality of lists outperforms hash maps at this scale.
Thread-based concurrency: Appropriate for I/O-bound tasks (subprocess calls, network requests). The GIL is not a bottleneck because tasks spend most time blocked on external tools.
Fixed-delay scheduling: Prevents task queuing but allows drift. Acceptable for monitoring tasks where precise timing isn't critical.
Recursive CLI loop: Simple but limited to ~1000 commands before stack overflow. Production systems should use iterative loops.
From a computer science perspective, Ngoto's architecture is well-suited to its constraints: small command sets (< 100), human-scale input latency (milliseconds), and I/O-dominated workloads. The code prioritizes simplicity and readability over premature optimization—appropriate for a tool where external subprocess execution dominates runtime.
The framework demonstrates that sophisticated systems don't require complex architectures. By choosing data structures and algorithms appropriate to the problem scale, Ngoto achieves good performance with minimal code complexity.
For security professionals building custom tool workflows or students learning offensive security techniques, Ngoto provides both a practical framework and an example of how to apply fundamental CS concepts—data structures, concurrency, metaprogramming—to real-world problems.