Source code for evopt.sampler

import multiprocessing as mp
import numpy as np
from scipy.stats import qmc
import cloudpickle
import concurrent.futures
import os
import traceback
from .directory_manager import DirectoryManager
from .utils import ProcessPoolManager, working_directory, write_to_csv, format_array

class SobolSampler():
    def __init__(
            self,
            params: dict,
            evaluator: callable,
            n_samples: int = 32,
            rand_seed: int = None,
            target_dict: dict = None,
            max_workers: int = 1,
            cores_per_worker: int = 1,
            base_dir: str = None,
            dir_id: str = None,
            verbose: bool = True,
            directory_manager: DirectoryManager = None,
            **kwargs
            ):
        self.params = params
        self.evaluator = evaluator
        self.n_samples = n_samples
        self.rand_seed = rand_seed
        self.target_dict = target_dict
        self.max_workers = max_workers
        self.cores_per_worker = cores_per_worker
        self.base_dir = base_dir
        self.dir_id = dir_id
        self.verbose = verbose
        self.kwargs = kwargs
        self._file_lock = mp.Lock()  # For CSV file access synchronization
        np.random.seed(rand_seed)

        self.dir_manager = directory_manager
        
        self.process_manager = ProcessPoolManager(
            max_workers=max_workers, 
            cores_per_worker=cores_per_worker,
            **kwargs
        )
        
        self.executor = None if max_workers <= 1 else self.process_manager.initialize()

        self.completed_samples = self.dir_manager.load_sample_history()
        self.all_samples = None

    def sampler(self):
        l_bounds = [v[0] for v in self.params.values()]
        u_bounds = [v[1] for v in self.params.values()]
        
        if self.verbose:
            print(f"Generating {self.n_samples} Sobol samples...")
        sampler = qmc.Sobol(d=len(self.params), scramble=True, seed=self.rand_seed)
        unit_samples = sampler.random(self.n_samples)
        self.all_samples = qmc.scale(unit_samples, l_bounds, u_bounds)

        return self.all_samples
    
    def _write_result_to_csv(
                self,
                sample_idx: int,
                param_dict: dict,
                result_dict: dict = None,
                error: float = None
        ) -> None:
        """Write a sample's results to CSV file.
        
        Records the results of evaluating a single solution, including parameter values,
        error, and any additional result metrics. Results are appended to the CSV file
        managed by the DirectoryManager.

        Args:
            sample: Sample index within the study.
            param_dict: Dictionary of parameter values used for this solution.
            result_dict: Optional dictionary of additional metrics from the evaluation.
                Default is None.
            error: Optional error value for the solution (lower is better).
                Default is None.
                
        Note:
            This is an internal method called by process_samples().
        """

        result = {
            'sample': sample_idx,
            'error': error if error is not None else None,
            **param_dict,
            **({k: result_dict.get(k) for k in result_dict} if result_dict else {}),
        }
        
        write_to_csv(result, self.dir_manager.results_csv, sort_columns=['sample'])


    def print_solution(self, sample_id: int, params: np.ndarray, error: float = None) -> None:
        """Print information about a single evaluated sample.
        
        Displays details of a sample evaluation, including the solution ID,
        parameter values, and error (optional). Only prints if verbose mode is enabled.

        Args:
            sol_id: Solution identifier within the current epoch.
            params: Array of parameter values used for this solution.
            error: Error value for the solution (lower is better).
        """
        
        if self.verbose:
            print(f"Sample {sample_id} | ({sample_id + 1}/{self.n_samples}) | Params: [{format_array(params)}] | Error: {'None' if error is None else f'{error:.3f}'}")


    @classmethod
    def _evaluate_solution_worker(cls, args: tuple) -> tuple:
        """Evaluate a single solution in a worker process.
        
        Static method that evaluates a solution with the provided parameters.
        Designed to be used with process pools for parallel evaluation.

        Args:
            args: Tuple containing:
                - sol_id (int): Solution identifier
                - params (np.ndarray): Parameter values to evaluate
                - param_names (list): List of parameter names
                - sample_folder (str): Folder for solution-specific files
                - pickled_evaluator (bytes): Cloudpickled evaluator function
                - target_dict (dict): Dictionary of target values (optional)
                - verbose (bool): Whether to print detailed information

        Returns:
            tuple: (sol_id, error, result_dict, param_dict) where:
                - sol_id (int): Solution identifier
                - result_dict (dict): Additional metrics from evaluation (if any)
                - error (float): Error value or None if evaluation failed (if any)
                - param_dict (dict): Dictionary of parameter values
                
        Note:
            This is an internal method called by process_samples().
        """

        # Unpack arguments
        (sample_id,
        params,
        param_names,
        sample_folder,
        pickled_evaluator,
        target_dict,
        verbose) = args
        
        np.random.seed(1000 + sample_id)  # Deterministic unique seed per solution
        try:
            evaluator_func = cloudpickle.loads(pickled_evaluator)
        except Exception as e:
            print(f"Error unpickling evaluator for solution {sample_id}: {e}")
            print(f"Traceback:\n{traceback.format_exc()}")

        # Convert parameters to dictionary
        param_dict = dict(zip(param_names, params))
        result_dict = None
        try:
            with working_directory(sample_folder):
                error = evaluator_func(param_dict)
                if isinstance(error, dict):
                    result_dict = error
                    error = None
            # Process target dictionary if provided
            if target_dict and isinstance(error, dict):
                from .loss import calc_loss  # Import here to avoid circular imports
                loss = calc_loss(target_dict, error, hard_to_soft_weight=0.9, method="mae")
                result_dict = loss.observed_dict
                error = loss.combined_loss

            elif target_dict:
                if verbose:
                    print(f"Error in solution {sample_id}: Expected dictionary, got {type(error)}")
                error = None

        except Exception as e:
            error = None
            #print(f"Error evaluating solution {sol_id}: {e}")
            #print(f"Traceback:\n{traceback.format_exc()}")
            # Clean up empty directory
            if os.path.exists(sample_folder) and len(os.listdir(sample_folder)) == 0:
                try:
                    os.rmdir(sample_folder)
                except:
                    pass  # Ignore errors during cleanup
            return sample_id, error, result_dict, param_dict

        # Clean up empty directory
        if os.path.exists(sample_folder) and len(os.listdir(sample_folder)) == 0:
            try:
                os.rmdir(sample_folder)
            except:
                pass  # Ignore errors during cleanup
        return sample_id, error, result_dict, param_dict



    def process_samples(self) -> list:
        """Process a batch of Sobol samples in parallel or serial mode.

        Handles the evaluation of multiple parameter sets (samples) using the
        provided evaluator function. Supports serial and parallel processing.

        Args:
            samples: List of scaled parameter value arrays (Sobol samples) to evaluate.

        Returns:
            list: A list containing the results for each sample. Each element is
                  a tuple: (sample_idx, error, result_dict, param_dict).
                  Returns None for samples that failed evaluation.

        Raises:
            Exception: If submitting tasks to the process pool fails catastrophically.
        """

        if self.all_samples is None:
            self.sampler()
        if self.all_samples is None or len(self.all_samples) != self.n_samples:
             raise RuntimeError("Sample generation failed or produced incorrect number of samples.")

        pickled_evaluator = cloudpickle.dumps(self.evaluator)
        sample_args = []

        param_names = list(self.params.keys())


        for idx, params_array in enumerate(self.all_samples):
            
            if idx in self.completed_samples:
                continue
            
            sample_folder = self.dir_manager.create_sample_folder(idx)
            
            args = (
                idx,                     # sample_idx
                params_array,            # params (already scaled)
                param_names,             # param_names
                sample_folder,           # sample_folder (shared directory)
                pickled_evaluator,       # pickled evaluator_function
                self.target_dict,        # target_dict (if any)
                self.verbose             # verbose
            )
            sample_args.append(args)

        # Initialise result containers
        num_to_process = len(sample_args)

        if num_to_process == 0:
            print("No new samples to process.")
            return []
        
        temp_result_dicts = [None] * num_to_process
        run_results = [None] * num_to_process
        
        def store_result(result, original_idx, run_idx):
            # result expected format: (sample_idx, error, result_dict, param_dict)
            if result is None: # Handle potential None return from worker on severe failure
                print(f"Warning: Evaluation worker returned None for sample {original_idx}")
                run_results[run_idx] = None # Store None to indicate failure
                return

            _idx, error, result_dict, param_dict = result
            if _idx != original_idx:
                print(f"Warning: Sample index mismatch! Expected {original_idx}, got {_idx}.")
                run_results[run_idx] = None
                return

            with self._file_lock:
                self._write_result_to_csv(_idx, param_dict, result_dict=result_dict, error=error)

            run_results[run_idx] = result
            
            if self.verbose:
                params_array = self.all_samples[original_idx]
                self.print_solution(_idx, params_array, error=error)

        if self.executor is None:
            # Serial processing
            print("Running evaluations in serial mode.")
            for run_idx, args in enumerate(sample_args):
                original_idx = args[0]
                try:
                    # Call the static worker method
                    result = self._evaluate_solution_worker(args)
                    store_result(result, original_idx, run_idx)
                except Exception as e:
                    print(f"Sample {original_idx} evaluation failed directly: {e}")
                    # Create a failure result tuple to store
                    params_dict = dict(zip(param_names, self.all_samples[original_idx]))
                    fail_result = (original_idx, None, None, params_dict)
                    store_result(fail_result, original_idx, run_idx) # Store failure information
                    continue

        else:
            try:
                futures = {self.executor.submit(self._evaluate_solution_worker, args): (args[0], run_idx)
                        for run_idx, args in enumerate(sample_args)}
                
            except Exception as e:
                print(f"Traceback:\n{traceback.format_exc()}")
                if self.executor._broken:
                    print("Process pool is broken - reinitializing")
                    self.process_manager.cleanup()
                    self.executor = self.process_manager.initialize()
                    return self.process_samples()

            for future in concurrent.futures.as_completed(futures):
                try:
                    original_idx, run_idx = futures[future]
                    result = future.result()  
                    store_result(result, original_idx, run_idx)

                except Exception as e:
                    params_dict = dict(zip(param_names, self.all_samples[original_idx]))
                    fail_result = (original_idx, None, None, params_dict)
                    store_result(fail_result, original_idx, run_idx) # Store failure information
                    continue
        
        return run_results

    def cleanup(self):
        """Clean up resources, particularly the process pool."""
        if hasattr(self, 'process_manager'):
            self.process_manager.cleanup()
            self.executor = None

    def __del__(self):
        """Attempt cleanup when the object is garbage collected."""
        self.cleanup()


[docs] def sample( params: dict, evaluator: callable, n_samples: int = 32, rand_seed: int = None, target_dict: dict = None, max_workers: int = 1, cores_per_worker: int = 1, base_dir: str = None, dir_id: str = None, verbose: bool = False, **kwargs ) -> list: """Sample Sobol sequences and evaluate them using the provided evaluator. Args: params (dict): Parameter space definition as a dictionary where keys are parameter names and values are tuples of (min_value, max_value) bounds. evaluator (callable): Function that evaluates a parameter set and returns either: - A single float value representing the error/fitness (lower is better) - A dictionary of observed values to be compared with target_dict n_samples (int): Number of Sobol samples to generate. Default is 32. rand_seed (int): Random seed for reproducibility. Default is None. target_dict (dict): Dictionary of target values for comparison. Default is None. max_workers (int): Maximum number of worker processes. Default is 1. cores_per_worker (int): Number of CPU cores per worker process. Default is 1. base_dir (str): Base directory for storing optimization results. If None, uses current directory. dir_id (str): Directory ID for organizing results. Default is None. verbose (bool): Whether to print detailed information during sampling and evaluation. Default is False. Returns: list: A list containing the results for each sample. Each element is a tuple: (sample_idx, error, result_dict, param_dict). Returns None for samples that failed evaluation. Raises: RuntimeError: If sample generation fails or produces incorrect number of samples. """ directory_manager = DirectoryManager( base_dir = os.getcwd() if base_dir is None else base_dir, dir_id = dir_id ) sampler = SobolSampler( params=params, evaluator=evaluator, n_samples=n_samples, rand_seed=rand_seed, target_dict=target_dict, max_workers=max_workers, cores_per_worker=cores_per_worker, base_dir=base_dir, dir_id=dir_id, verbose=verbose, directory_manager=directory_manager, **kwargs ) try: with directory_manager.logger: results = sampler.process_samples() return results finally: if hasattr(sampler, 'cleanup'): sampler.cleanup() # closes process workers