Helpers (evopt.utils)
Warning
This module is part of the internal implementation of evopt and is not intended for direct use by end users. Its API may change without notice.
Utility functions and classes for evolutionary optimization.
This module provides various utility functions and classes for supporting evolutionary optimization processes in different environments, including HPC clusters and local machines. It includes tools for environment detection, process management, file operations, dictionary manipulation, and logging.
The module supports different execution environments: - Local execution - SLURM cluster execution - PBS/Torque cluster execution - LSF cluster execution
Example
Basic usage of the process pool manager:
>>> from evopt.utils import ProcessPoolManager
>>> manager = ProcessPoolManager(max_workers=4)
>>> executor = manager.initialize()
>>> # Use executor for parallel processing
>>> manager.cleanup()
Using the logging utility:
>>> with Logger("./logs") as logger:
>>> print("This will be logged to both console and file")
- class evopt.utils.ExecutionEnvironment(value)[source]
Bases:
EnumEnum representing different execution environments.
This enumeration defines the different types of execution environments that the code can run in, from local machines to various HPC systems.
- LOCAL
Local machine execution.
- SLURM
SLURM-based cluster execution.
- PBS
PBS/Torque cluster execution.
- LSF
LSF cluster execution.
- LOCAL = 1
- LSF = 4
- PBS = 3
- SLURM = 2
- class evopt.utils.Logger(log_dir: str, log_file: str = 'logfile.log')[source]
Bases:
objectA simple logger that writes messages to both the terminal and a log file.
This class redirects standard output to both the console and a log file, adding timestamps to each line. It’s designed to be used as a context manager.
- terminal
The original stdout stream.
- log_path
Path to the log file.
- log
File handle for the log file.
Examples
Basic usage:
>>> with Logger("./logs", "my_run.log") as logger: ... print("This message goes to both console and log file")
Simple message logging:
>>> logger = Logger("./logs") >>> with logger: ... for i in range(5): ... print(f"Processing item {i}")
- flush() None[source]
Flush the buffers of both the terminal and the log file.
This ensures that any buffered output is written immediately.
- Returns:
None
- write(message: str) None[source]
Write a message to both the terminal and the log file.
Adds a timestamp to each non-empty line before writing to the log file. The original message (without timestamps) is written to the terminal.
- Parameters:
message (str) – The message to write.
- Returns:
None
- class evopt.utils.ProcessPoolManager(max_workers: int = 1, cores_per_worker: int = 1, memory_gb_per_worker: float = 4, wall_time: str = '01:00:00', qos: str | None = None)[source]
Bases:
objectManages process pools across different execution environments.
This class provides a unified interface for parallel processing across different computing environments (local machine or HPC clusters). It automatically detects the environment and sets up the appropriate executor for parallel task processing.
- env
The detected execution environment.
- cores_per_worker
Number of CPU cores allocated to each worker.
- memory_gb_per_worker
Memory (GB) allocated to each worker.
- wall_time
Maximum job runtime for HPC jobs.
- qos
Quality of Service for SLURM jobs.
- max_workers
Maximum number of concurrent workers.
Examples
Basic usage on a local machine:
>>> manager = ProcessPoolManager(max_workers=4) >>> with manager.initialize() as executor: ... results = list(executor.map(my_function, my_inputs))
Usage with more specific resource requirements:
>>> manager = ProcessPoolManager( ... max_workers=8, ... cores_per_worker=4, ... memory_gb_per_worker=16, ... wall_time="08:00:00" ... ) >>> executor = manager.initialize() >>> try: ... futures = [executor.submit(my_function, arg) for arg in my_inputs] ... for future in concurrent.futures.as_completed(futures): ... result = future.result() ... finally: ... manager.cleanup()
- check_workers_health()[source]
Check if workers are healthy and replace any dead ones
- cleanup()[source]
Clean up resources used by the executor.
Shuts down the executor if it exists and cancels any pending SLURM jobs. This method should be called when the executor is no longer needed to ensure proper resource cleanup.
- Returns:
None
Example
>>> manager = ProcessPoolManager(max_workers=4) >>> executor = manager.initialize() >>> try: ... # Use executor for parallel processing ... pass ... finally: ... manager.cleanup()
- initialize()[source]
Initialize the appropriate executor based on the execution environment.
This method detects the current execution environment and creates the appropriate type of executor. For local environments, it creates a ProcessPoolExecutor. For SLURM environments, it creates a SlurmExecutor if available, otherwise falls back to local processing.
- Returns:
- The initialized executor object,
or None if max_workers <= 1 (serial processing).
- Return type:
concurrent.futures.Executor or None
Example
>>> manager = ProcessPoolManager(max_workers=4) >>> executor = manager.initialize() >>> if executor: ... futures = [executor.submit(func, i) for i in range(10)]
- class evopt.utils.SlurmJobManager[source]
Bases:
objectManages job submissions to SLURM workload manager.
This class provides static methods for submitting, monitoring, and canceling jobs on SLURM-based HPC clusters. It handles script creation, job submission, and job status monitoring.
Examples
Submit a simple job:
>>> job_id = SlurmJobManager.submit_job( ... "python my_script.py", ... "optimization", ... 4, ... "./output" ... ) >>> print(f"Job submitted with ID: {job_id}")
Wait for job completion:
>>> SlurmJobManager.wait_for_job(job_id) >>> print("Job completed")
- static cancel_job(job_id: int) None[source]
Cancel a running SLURM job.
Sends a cancellation request for the specified job ID.
- Parameters:
job_id – The SLURM job ID to cancel.
- Returns:
None
Example
>>> job_id = SlurmJobManager.submit_job(...) >>> # Cancel the job if needed >>> SlurmJobManager.cancel_job(job_id)
- static submit_job(script_content: str, job_name: str, cpus_per_task: int, output_dir: str, memory_gb: float | None = None, wall_time: str = '01:00:00', qos: str | None = None) int[source]
Submit a job to the SLURM scheduler.
Creates a temporary SLURM batch script with the specified parameters and submits it to the SLURM workload manager.
- Parameters:
script_content – The script content to be executed.
job_name – Name for the SLURM job.
cpus_per_task – Number of CPUs to allocate per task.
output_dir – Directory for SLURM output and error files.
memory_gb – Memory allocation in gigabytes. Default is None (uses SLURM default).
wall_time – Maximum wall time in format “HH:MM:SS”. Default is “01:00:00”.
qos – Quality of Service to request. Default is None.
- Returns:
The SLURM job ID.
- Return type:
int
- Raises:
subprocess.SubprocessError – If job submission fails.
ValueError – If job ID cannot be parsed from SLURM response.
Example
>>> job_id = SlurmJobManager.submit_job( ... "python run_optimization.py", ... "evopt_job", ... 4, ... "./slurm_logs", ... memory_gb=16 ... )
- static wait_for_job(job_id: int, check_interval: int = 10) None[source]
Wait for a SLURM job to complete.
Polls the SLURM queue periodically until the specified job is no longer found in the queue, indicating completion.
- Parameters:
job_id – The SLURM job ID to monitor.
check_interval – Seconds to wait between checks. Default is 10.
- Returns:
None
- Raises:
subprocess.SubprocessError – If checking job status fails.
Example
>>> job_id = SlurmJobManager.submit_job(...) >>> SlurmJobManager.wait_for_job(job_id, check_interval=30) >>> print("Job finished")
- evopt.utils.convert_to_native(value)[source]
Convert a value to a native Python type for serialization.
Converts NumPy types, nested lists, and dictionaries to their native Python equivalents. This is particularly useful for preparing data for JSON serialization or other formats that don’t support NumPy types.
- Parameters:
value (Any) – The value to convert. Can be a NumPy type, list, dict, or None.
- Returns:
The converted value in native Python format.
- Return type:
Any
Example
>>> import numpy as np >>> data = {'value': np.float64(3.14159), 'array': np.array([1.0, 2.0, 3.0])} >>> convert_to_native(data) {'value': 3.142, 'array': [1.0, 2.0, 3.0]}
- evopt.utils.detect_environment() ExecutionEnvironment[source]
Detect the current execution environment based on environment variables.
This function examines environment variables to determine whether the code is running on a local machine or in a cluster environment (SLURM, PBS, LSF).
- Returns:
The detected execution environment.
- Return type:
ExecutionEnvironment
Example
>>> env = detect_environment() >>> if env == ExecutionEnvironment.SLURM: >>> print("Running in a SLURM environment")
- evopt.utils.extend_dict(master_dict: dict, slave_dict: dict) None[source]
Merge dictionary keys and extend values as lists.
If a key exists in both dictionaries, the values from slave_dict are appended to the list of values in master_dict. If a key only exists in slave_dict, it is added to master_dict.
This function modifies master_dict in-place.
- Parameters:
master_dict (dict) – The dictionary to extend (modified in-place).
slave_dict (dict) – The dictionary containing values to add.
- Returns:
The master_dict is modified in-place.
- Return type:
None
Example
>>> master = {'a': [1, 2], 'b': 3} >>> slave = {'a': 4, 'b': [5, 6], 'c': 7} >>> extend_dict(master, slave) >>> print(master) {'a': [1, 2, 4], 'b': [3, 5, 6], 'c': 7}
- evopt.utils.format_array(arr, precision=3)[source]
Format a numpy array into a string with a specified precision.
Creates a comma-separated string representation of the array values, with each value formatted to the specified precision.
- Parameters:
arr (np.ndarray or list) – The array to format.
precision (int, optional) – The number of decimal places to include. Default is 3.
- Returns:
A string representation of the array.
- Return type:
str
Example
>>> import numpy as np >>> arr = np.array([1.2345, 5.6789, 9.8765]) >>> format_array(arr, precision=2) '1.23, 5.68, 9.88'
- evopt.utils.get_available_cpus() int[source]
Get number of CPUs available, accounting for HPC environment variables.
This function determines the number of available CPUs based on the current execution environment, checking relevant environment variables for different HPC systems before falling back to the system CPU count.
- Returns:
The number of available CPUs.
- Return type:
int
Example
>>> num_cpus = get_available_cpus() >>> print(f"Using {num_cpus} CPUs for computation")
- evopt.utils.worker_init()[source]
Initialize worker processes as EVOPT_WORKER.
- evopt.utils.working_directory(path)[source]
A context manager for temporarily changing the working directory.
This context manager allows for temporarily changing the working directory and automatically restoring it when exiting the context.
- Parameters:
path – Path to the directory to change to.
- Yields:
None
Example
>>> with working_directory('/path/to/dir'): >>> # Do something in the new directory >>> pass
- evopt.utils.write_to_csv(data, csv_path, sort_columns=None)[source]
Write a dictionary of data to a CSV file with optional sorting.
Appends a row of data to an existing CSV file or creates a new one. The function handles numeric conversion and supports sorting by specified columns.
- Parameters:
data (dict) – The data to write as a row in the CSV.
csv_path (str) – The path to the CSV file.
sort_columns (list, optional) – Column names to sort the data by. Default is None.
- Returns:
None
- Raises:
OSError – If the file cannot be created or written to.
Example
>>> data = {'epoch': 1, 'error': 0.023, 'param1': 0.5, 'param2': 1.2} >>> write_to_csv(data, 'results.csv', sort_columns=['epoch'])