Source code for pyfracval.main_runner

"""Core function to run the FracVAL simulation."""

import logging
import time
from typing import Any

import numpy as np

# Import necessary modules from your library
from . import particle_generation, utils
from .cca_agg import CCAggregator
from .pca_subclusters import Subclusterer
from .schemas import AggregateProperties, GenerationInfo, Metadata, SimulationParameters

logger = logging.getLogger(__name__)


[docs] def run_simulation( iteration: int, sim_config_dict: dict[str, Any], output_base_dir: str = "RESULTS", seed: int | None = None, ) -> tuple[bool, np.ndarray | None, np.ndarray | None]: """ Run one full FracVAL aggregate generation (PCA + CCA). Orchestrates the simulation pipeline: 1. Validates input parameters using `SimulationParameters`. 2. Sets random seed. 3. Generates initial particle radii (lognormal distribution). 4. Shuffles radii. 5. Performs PCA subclustering using `Subclusterer`. 6. Performs CCA aggregation using `CCAggregator` on the PCA results. 7. Calculates final aggregate properties (Rg, CM). 8. Saves results (metadata + data) using `Metadata.save_to_file`. 9. Provides enhanced error messages and suggestions on failure. Parameters ---------- iteration : int The iteration number (e.g., for generating multiple aggregates), used mainly for output filenames and metadata. sim_config_dict : dict[str, Any] Dictionary containing simulation parameters conforming to `SimulationParameters` schema (N, Df, kf, rp_g, rp_gstd, etc.). output_base_dir : str, optional Base directory to save the output `.dat` file, by default "RESULTS". seed : int | None, optional Random seed for reproducibility, by default None (time-based). Returns ------- tuple[bool, np.ndarray | None, np.ndarray | None] A tuple containing: - success_flag (bool): True if the simulation completed successfully, False otherwise. - final_coords (np.ndarray | None): Nx3 array of coordinates if successful, None otherwise. - final_radii (np.ndarray | None): N array of radii if successful, None otherwise. """ logger.info(f"===== Starting Aggregate Generation {iteration} =====") try: if seed is not None and "seed" not in sim_config_dict: sim_config_dict["seed"] = seed sim_params = SimulationParameters(**sim_config_dict) logger.info(f"Validated Config: {sim_params.model_dump_json(indent=2)}") except Exception as e: logger.error(f"Invalid simulation parameters provided: {e}", exc_info=True) return False, None, None start_time = time.time() if sim_params.seed is not None: np.random.seed(sim_params.seed) logger.info(f"Using random seed: {sim_params.seed}") # 1. Generate Initial Particle Radii try: initial_radii = particle_generation.lognormal_pp_radii( sim_params.rp_gstd, sim_params.rp_g, sim_params.N, ) logger.info( f"Generated initial radii (Mean: {np.mean(initial_radii):.2f}, Std: {np.std(initial_radii):.2f})" ) except ValueError as e: logger.error(f"Error generating radii: {e}") return False, None, None # 2. Shuffle Radii shuffled_radii = utils.shuffle_array(initial_radii.copy()) # 3. PCA Subclustering logger.info("--- Starting PCA Subclustering ---") pca_start_time = time.time() subcluster_runner = Subclusterer( initial_radii=shuffled_radii, df=sim_params.Df, kf=sim_params.kf, tol_ov=sim_params.tol_ov, n_subcl_percentage=sim_params.n_subcl_percentage, ) pca_success = subcluster_runner.run_subclustering() pca_end_time = time.time() logger.info(f"PCA Subclustering Time: {pca_end_time - pca_start_time:.2f} seconds") # --- Enhanced PCA Failure Handling --- if not pca_success or subcluster_runner.not_able_pca: failed_subcluster_num = getattr( subcluster_runner, "number_clusters_processed", "N/A" ) if failed_subcluster_num != "N/A": failed_subcluster_num += 1 # Adjust to 1-based index logger.error( f"PCA Subclustering failed (Failed on Subcluster {failed_subcluster_num})." ) logger.error("PCA Failure Diagnosis & Suggestions:") logger.error( f" - The current target parameters (Df={sim_params.Df}, kf={sim_params.kf}) might be geometrically challenging during PCA." ) logger.error(" - Common Fixes for PCA Failure (Try in order):") logger.error( f" 1. Increase target kf: Try `--kf {sim_params.kf + 0.1:.1f}` or `--kf {sim_params.kf + 0.2:.1f}`. (Often helps if Gamma or Sticking fails)." # Decreasing kf is less common for PCA failures seen so far, but possible if Gamma calc itself fails # logger.error(f" Alt. Decrease target kf: Try `--kf {max(0.1, sim_params.kf - 0.1):.1f}` (May help if Gamma calculation fails).") ) logger.error( f" 2. Increase target Df: Try `--df {sim_params.Df + 0.05:.2f}` or `--df {sim_params.Df + 0.1:.1f}`." ) logger.error( f" 3. Increase overlap tolerance: Try `--tol-ov 1e-5` or `--tol-ov 1e-4` (If failure is during sticking/rotation)." ) logger.error( f" 4. Reduce subcluster size: Try `--n-subcl-perc {max(0.02, sim_params.n_subcl_percentage * 0.8):.2f}` (e.g., 0.08 if currently 0.1)." ) logger.error( " 5. Try a different random seed: Add `--seed <number>` or change the existing seed." ) logger.error( " 6. Increase max attempts: Use `--max-attempts <number>` (e.g., 10)." ) return False, None, None # --- End Enhanced PCA Handling --- # Retrieve results only if PCA succeeded num_clusters, not_able_pca_flag, pca_coords_radii, pca_i_orden, _ = ( subcluster_runner.get_results() ) # Double check, though should be caught above if not_able_pca_flag or pca_coords_radii is None or pca_i_orden is None: logger.error("PCA returned invalid results despite reporting success.") return False, None, None # 4. Cluster-Cluster Aggregation logger.info("--- Starting Cluster-Cluster Aggregation ---") cca_start_time = time.time() cca_runner = CCAggregator( initial_coords=pca_coords_radii[:, :3], initial_radii=pca_coords_radii[:, 3], initial_i_orden=pca_i_orden, n_total=sim_params.N, df=sim_params.Df, kf=sim_params.kf, tol_ov=sim_params.tol_ov, ext_case=sim_params.ext_case, ) cca_result = cca_runner.run_cca() cca_end_time = time.time() logger.info(f"CCA Aggregation Time: {cca_end_time - cca_start_time:.2f} seconds") # --- Enhanced CCA Failure Handling --- if cca_result is None or cca_runner.not_able_cca: logger.error("CCA Aggregation failed.") logger.error("CCA Failure Diagnosis & Suggestions:") logger.error( " - Failure often occurs during cluster pairing or sticking due to geometric constraints from target Df/kf." ) logger.error( f" - Check logs for WARNings about 'RELAXED condition' during pairing or 'No initial candidates found' / 'Sticking failed' during sticking." ) logger.error(" - Common Fixes for CCA Failure (Try in order):") logger.error( f" 1. Increase target kf: Try `--kf {sim_params.kf + 0.1:.1f}` or `--kf {sim_params.kf + 0.2:.1f}`. (Often helps relax pairing/sticking)." ) logger.error( f" 2. Increase target Df: Try `--df {sim_params.Df + 0.05:.2f}` or `--df {sim_params.Df + 0.1:.1f}`." ) logger.error( f" 3. Check CCA Pairing Factor: If warnings about RELAXED condition were frequent, the factor in `cca_agg.py` might need adjustment (currently hardcoded)." ) logger.error( " 4. Try a different random seed: Add `--seed <number>` or change the existing seed (affects PCA structure)." ) logger.error( " 5. Increase max attempts: Use `--max-attempts <number>` (e.g., 10)." ) return False, None, None # --- End Enhanced CCA Handling --- # 5. Prepare Results (Only if CCA succeeded) final_coords, final_radii = cca_result n_actual = final_coords.shape[0] # Calculate final properties including Rg final_rg = 0.0 final_cm = [0.0, 0.0, 0.0] # Use list default if n_actual > 0: try: # Pass target Df/kf for final property calculation consistency final_mass, final_rg_val, final_cm_arr, final_r_max = ( utils.calculate_cluster_properties( final_coords, final_radii, sim_params.Df, sim_params.kf, ) ) # Handle potential None return from calculate_rg inside calculate_cluster_properties final_rg = final_rg_val if final_rg_val is not None else 0.0 final_cm = ( final_cm_arr.tolist() if final_cm_arr is not None else [0.0, 0.0, 0.0] ) logger.info(f"Final Aggregate Calculated Rg: {final_rg:.4f}") except Exception as e: logger.warning(f"Could not calculate final aggregate properties: {e}") final_rg = None # Use None if calculation failed final_cm = None # Create Metadata gen_info = GenerationInfo(iteration=iteration) agg_props = AggregateProperties( N_particles_actual=n_actual, radius_of_gyration=final_rg, center_of_mass=final_cm, ) metadata_instance = Metadata( generation_info=gen_info, simulation_parameters=sim_params, aggregate_properties=agg_props, ) # 6. Save Results metadata_instance.save_to_file( folderpath=output_base_dir, coords=final_coords, radii=final_radii, ) end_time = time.time() logger.info( f"===== Aggregate {iteration} Finished Successfully ({end_time - start_time:.2f} seconds) =====" ) return True, final_coords, final_radii