Source code for pyfracval.pca_subclusters

"""Divides initial particles into subclusters using PCA."""

import logging
import math

import numpy as np

from .pca_agg import PCAggregator

# Ensure access to utils if needed, though likely not directly
# from . import utils

logger = logging.getLogger(__name__)


[docs] class Subclusterer: """Handles division of particles into subclusters using PCA. Takes a full set of initial particle radii, determines appropriate subcluster sizes, and runs the `PCAggregator` on each subset of radii to generate initial cluster structures. These subclusters are intended as input for subsequent Cluster-Cluster Aggregation (CCA). Parameters ---------- initial_radii : np.ndarray 1D array of all initial primary particle radii. df : float Target fractal dimension (passed to `PCAggregator`). kf : float Target fractal prefactor (passed to `PCAggregator`). tol_ov : float Overlap tolerance (passed to `PCAggregator`). n_subcl_percentage : float Target fraction of N used to determine the approximate size of each subcluster. Actual sizes may vary. Attributes ---------- N : int Total number of particles. all_coords : np.ndarray Nx3 array storing coordinates of all particles after PCA subclustering. all_radii : np.ndarray N array storing radii of all particles (should match initial radii order if PCA doesn't reorder, but uses radii from PCA output). i_orden : np.ndarray | None Mx3 array defining the start index, end index (inclusive), and count for each generated subcluster within the `all_coords`/`all_radii` arrays. None until `run_subclustering` is successful. number_clusters : int The number of subclusters generated. not_able_pca : bool Flag indicating if any PCA run for a subcluster failed. number_clusters_processed : int Index of the last subcluster processed (useful for error reporting). """ def __init__( self, initial_radii: np.ndarray, df: float, # Target Df for final aggregate kf: float, # Target kf for final aggregate tol_ov: float, n_subcl_percentage: float, # Optional overrides for PCA stage Df/kf could be added here # pca_df_override: float | None = None, # pca_kf_override: float | None = None, ):
[docs] self.N = len(initial_radii)
self.initial_radii = initial_radii.copy() # Use a copy self.df = df # Store target Df self.kf = kf # Store target kf self.tol_ov = tol_ov self.n_subcl_percentage = n_subcl_percentage # self.pca_df_override = pca_df_override # Store overrides if used # self.pca_kf_override = pca_kf_override
[docs] self.all_coords = np.zeros((self.N, 3), dtype=float)
[docs] self.all_radii = np.zeros(self.N, dtype=float)
[docs] self.i_orden: np.ndarray | None = None
[docs] self.number_clusters: int = 0
[docs] self.not_able_pca: bool = False
[docs] self.number_clusters_processed = 0 # Track for error reporting
def _determine_subcluster_sizes(self) -> np.ndarray: """Calculates the size of each subcluster.""" # --- Heuristic for N_subcl based on N (from Fortran comments/logic) --- if self.N < 50: # Fortran uses Nsub=5, leading to many small clusters # Let's use the percentage but ensure min size (e.g., 5?) n_subcl_target = max(5, int(self.n_subcl_percentage * self.N)) # Ensure n_subcl is at least 2 n_subcl = max(2, n_subcl_target) elif self.N > 500: # Fortran uses Nsub=50 n_subcl_target = max(50, int(self.n_subcl_percentage * self.N)) n_subcl = n_subcl_target # Allow larger for large N if percentage dictates else: # 50 <= N <= 500 # Use percentage, but ensure min size (e.g., 5 or 10?) n_subcl_target = max(10, int(self.n_subcl_percentage * self.N)) n_subcl = n_subcl_target # Ensure n_subcl is not larger than N n_subcl = min(n_subcl, self.N) # Calculate number of clusters needed # Handle case where N < n_subcl (results in 1 cluster) if self.N < n_subcl: self.number_clusters = 1 subcluster_sizes = np.array([self.N], dtype=int) else: self.number_clusters = math.ceil(self.N / n_subcl) subcluster_sizes = np.full(self.number_clusters, n_subcl, dtype=int) # Adjust the last cluster size if N is not perfectly divisible remainder = self.N % n_subcl if remainder != 0: actual_size_last = self.N - n_subcl * (self.number_clusters - 1) subcluster_sizes[-1] = actual_size_last # elif self.N % n_subcl == 0: # No adjustment needed # pass # Sanity check if np.sum(subcluster_sizes) != self.N: logger.error( # Changed to error as this indicates a logic flaw f"Subcluster size calculation error: Sum={np.sum(subcluster_sizes)} != N={self.N}. Sizes={subcluster_sizes}" ) # Attempt recovery, though it might mask the root cause subcluster_sizes[-1] = self.N - np.sum(subcluster_sizes[:-1]) if subcluster_sizes[-1] <= 0: # Check if last size became invalid raise ValueError( "Subcluster size calculation resulted in non-positive size." ) logger.warning(f"Corrected last size to: {subcluster_sizes[-1]}") logger.info( f"Subclustering N={self.N} into {self.number_clusters} clusters with target size ~{n_subcl}." ) logger.info(f"Actual sizes: {subcluster_sizes}") return subcluster_sizes
[docs] def run_subclustering(self) -> bool: """Perform the subclustering process. Determines subcluster sizes, then iterates through subsets of the initial radii, running `PCAggregator` for each subset. Stores the resulting coordinates and radii contiguously and updates `i_orden`. Returns ------- bool True if all subclusters were generated successfully, False otherwise. Sets `self.not_able_pca` to True on failure. """ subcluster_sizes = self._determine_subcluster_sizes() # Handle the edge case of only 1 cluster (N < n_subcl or N=n_subcl) if self.number_clusters == 1 and subcluster_sizes[0] == self.N: logger.info( "Only one subcluster required (N <= effective n_subcl). Running PCA on all particles." ) # Proceed with the loop below, it will just run once. self.i_orden = np.zeros((self.number_clusters, 3), dtype=int) self.not_able_pca = False current_n_start_idx = 0 # Index in the initial_radii array current_fill_idx = 0 # Index in the final all_coords/all_radii pca_df = self.df pca_kf = self.kf # --- Define Df/kf to use *specifically* for PCA --- # Use fixed, stable values, e.g., typical DLCA or Filippov mono values # pca_df = 1.79 # pca_kf = 1.40 # Alternatively, use overrides if they were passed during init: # pca_df = self.pca_df_override if self.pca_df_override is not None else 1.79 # pca_kf = self.pca_kf_override if self.pca_kf_override is not None else 1.40 # --- Do NOT use self.target_df / self.target_kf here if they cause issues --- logger.info( f"--- Using fixed parameters for PCA stage: Df={pca_df:.2f}, kf={pca_kf:.2f} ---" ) for i in range(self.number_clusters): self.number_clusters_processed = i # Track for error reporting num_particles_in_subcluster = subcluster_sizes[i] logger.info( f"--- Processing Subcluster {i + 1}/{self.number_clusters} (Size: {num_particles_in_subcluster}) ---" ) if num_particles_in_subcluster < 2: logger.error( f"Subcluster {i + 1} has size {num_particles_in_subcluster}, needs >= 2 for PCA." ) # This should not happen if _determine_subcluster_sizes is correct self.not_able_pca = True return False # Extract radii for this subcluster idx_start = current_n_start_idx idx_end = current_n_start_idx + num_particles_in_subcluster subcluster_radii = self.initial_radii[idx_start:idx_end] # Run PCA for this subcluster using the *fixed* pca_df, pca_kf pca_runner = PCAggregator( subcluster_radii, pca_df, # Use fixed PCA Df pca_kf, # Use fixed PCA kf self.tol_ov, ) subcluster_data = pca_runner.run() # Returns Nx4 [X,Y,Z,R] or None if subcluster_data is None or pca_runner.not_able_pca: logger.error(f"PCA failed for subcluster {i + 1}.") self.not_able_pca = True # No need to return immediately, let main_runner handle retry return False # Signal failure for this attempt # Store the results num_added = subcluster_data.shape[0] # Basic check if PCA returned expected number of particles if num_added != num_particles_in_subcluster: logger.warning( f"PCA for subcluster {i + 1} returned {num_added} particles, expected {num_particles_in_subcluster}." ) # This might indicate internal PCA issues, but proceed cautiously if current_fill_idx + num_added > self.N: logger.error( f"Exceeding total particle count N during subclustering " f"(current_fill_idx={current_fill_idx}, num_added={num_added}, N={self.N})." ) self.not_able_pca = True return False fill_slice = slice(current_fill_idx, current_fill_idx + num_added) self.all_coords[fill_slice, :] = subcluster_data[:, :3] self.all_radii[fill_slice] = subcluster_data[:, 3] # Update i_orden (0-based inclusive indices) start_cluster_idx = current_fill_idx end_cluster_idx = current_fill_idx + num_added - 1 self.i_orden[i, :] = [start_cluster_idx, end_cluster_idx, num_added] # Update indices for next iteration current_n_start_idx += ( num_particles_in_subcluster # Advance by expected size ) current_fill_idx += num_added # Advance by actual added size # Final check after loop if current_fill_idx != self.N: logger.warning( f"Final particle count ({current_fill_idx}) after subclustering does not match N ({self.N}). " f"This might indicate inconsistent particle counts returned by PCA runs." ) # Correct i_orden if necessary? Might be complex. Let CCA handle potential mismatch. logger.info("PCA Subclustering completed for this attempt.") return True # Success for this attempt
[docs] def get_results( self, ) -> tuple[int, bool, np.ndarray | None, np.ndarray | None, np.ndarray | None]: """Return the results of the subclustering process. Returns ------- tuple[int, bool, np.ndarray | None, np.ndarray | None, np.ndarray | None] A tuple containing: - number_clusters (int): The intended number of clusters. - not_able_pca (bool): Flag indicating if any PCA failed. - combined_data (np.ndarray | None): Nx4 array [X, Y, Z, R] of all particles, or None on failure. - i_orden (np.ndarray | None): Mx3 array describing subcluster indices, or None on failure. - final_radii (np.ndarray | None): N array of radii corresponding to `combined_data`, or None on failure. """ if self.not_able_pca or self.i_orden is None: # Check i_orden initialization return 0, True, None, None, None else: # Combine coords and radii into the 'Data' format [X, Y, Z, R] # Ensure slicing is correct if fill_idx != N final_count = self.i_orden[-1, 1] + 1 if self.i_orden.shape[0] > 0 else 0 if final_count != self.N: logger.warning( f"get_results: final count in i_orden ({final_count}) != N ({self.N}). Returning sliced data." ) combined_data = np.hstack( ( self.all_coords[:final_count], self.all_radii[:final_count].reshape(-1, 1), ) ) # Return only the valid part of i_orden if fewer clusters were made (shouldn't happen here) valid_i_orden = ( self.i_orden[: self.number_clusters_processed + 1, :] if self.number_clusters_processed + 1 < self.number_clusters else self.i_orden ) return ( self.number_clusters, # Still return the intended number False, combined_data, valid_i_orden, self.all_radii[:final_count], # Return only valid radii )