"""Implements Particle-Cluster Aggregation (PCA) for initial subclusters."""
import logging
import numpy as np
from . import config, utils
from .logs import TRACE_LEVEL_NUM
logger = logging.getLogger(__name__)
FLOATING_POINT_ERROR = 1e-9 # Defined in utils, but maybe locally needed? Use utils.
[docs]
class PCAggregator:
"""Performs Particle-Cluster Aggregation (PCA).
Builds a single cluster by sequentially adding individual primary
particles (monomers) to a growing aggregate, attempting to match
the target Df and kf via the Gamma_pc calculation at each step.
Includes overlap checking and rotation to find valid placements.
Parameters
----------
initial_radii : np.ndarray
1D array of radii for the primary particles to be aggregated.
df : float
Target fractal dimension for the aggregate.
kf : float
Target fractal prefactor for the aggregate.
tol_ov : float
Maximum allowable overlap fraction between particles.
Attributes
----------
N : int
Total number of particles to aggregate.
initial_mass : np.ndarray
Calculated initial masses corresponding to `initial_radii`.
coords : np.ndarray
Nx3 array storing coordinates of particles as they are placed.
radii : np.ndarray
N array storing radii of particles as they are placed.
mass : np.ndarray
N array storing masses of particles as they are placed.
n1 : int
Number of particles currently in the aggregate.
m1 : float
Mass of the current aggregate.
rg1 : float
Radius of gyration of the current aggregate.
cm : np.ndarray
3D center of mass of the current aggregate.
r_max : float
Maximum distance from CM to any particle center in the aggregate.
not_able_pca : bool
Flag indicating if the aggregation process failed.
"""
def __init__(self, initial_radii: np.ndarray, df: float, kf: float, tol_ov: float):
[docs]
self.N = len(initial_radii)
if self.N < 2:
raise ValueError("PCA requires at least 2 particles.")
self.initial_radii = initial_radii.copy()
# Calculate initial mass using utils consistently
[docs]
self.initial_mass = utils.calculate_mass(self.initial_radii)
self.df = df
self.kf = kf
self.tol_ov = tol_ov
# State variables for the growing cluster
[docs]
self.coords = np.zeros((self.N, 3), dtype=float)
[docs]
self.radii = np.zeros(self.N, dtype=float)
[docs]
self.mass = np.zeros(self.N, dtype=float)
[docs]
self.n1: int = 0 # Number of particles currently in the aggregate
[docs]
self.m1: float = 0.0 # Mass of the current aggregate
[docs]
self.rg1: float = 0.0 # Radius of gyration of the current aggregate
[docs]
self.cm = np.zeros(3) # Center of mass of the current aggregate
[docs]
self.r_max: float = 0.0 # Max distance from CM in the current aggregate
[docs]
self.not_able_pca: bool = False
def _random_point_sphere(self) -> tuple[float, float]:
"""Generates random angles (theta, phi) for a point on a sphere."""
u, v = np.random.rand(2)
# Use constant from config
theta = 2.0 * config.PI * u
phi = np.arccos(2.0 * v - 1.0)
return theta, phi
def _first_two_monomers(self):
"""Places the first two monomers."""
if self.N < 2:
return # Should be caught by __init__ but safe check
self.radii[0] = self.initial_radii[0]
self.radii[1] = self.initial_radii[1]
self.mass[0] = self.initial_mass[0]
self.mass[1] = self.initial_mass[1]
# Place first particle at origin
self.coords[0, :] = 0.0
# Place second particle touching the first (deterministically on X for consistency)
# distance = self.radii[0] + self.radii[1]
# self.coords[1, :] = [distance, 0.0, 0.0]
# Alternative: random orientation (original Fortran way)
theta, phi = self._random_point_sphere()
distance = self.radii[0] + self.radii[1]
self.coords[1, 0] = self.coords[0, 0] + distance * np.cos(theta) * np.sin(phi)
self.coords[1, 1] = self.coords[0, 1] + distance * np.sin(theta) * np.sin(phi)
self.coords[1, 2] = self.coords[0, 2] + distance * np.cos(phi)
self.n1 = 2
self.m1 = self.mass[0] + self.mass[1]
# Use utils for rg calculation
self.rg1 = utils.calculate_rg(self.radii[: self.n1], self.n1, self.df, self.kf)
if self.m1 > utils.FLOATING_POINT_ERROR: # Use utils tolerance
self.cm = (
self.coords[0] * self.mass[0] + self.coords[1] * self.mass[1]
) / self.m1
else:
self.cm = np.mean(self.coords[: self.n1], axis=0) # Use n1 slice
# Initial r_max (max distance from CM)
dist_0_cm = np.linalg.norm(self.coords[0] - self.cm)
dist_1_cm = np.linalg.norm(self.coords[1] - self.cm)
self.r_max = max(dist_0_cm, dist_1_cm)
def _gamma_calculation(self, m2: float, rg2: float) -> tuple[bool, float]:
"""
Calculates Gamma_pc for adding the next monomer (aggregate 2).
"""
n2 = 1
n3 = self.n1 + n2
m3 = self.m1 + m2
# Ensure index is valid before accessing initial_radii
if self.n1 >= self.N:
logger.error(
f"Gamma calculation requested for particle index {self.n1} >= N ({self.N})"
)
return False, 0.0
# Radii of particles already in cluster + the next one to be added
combined_radii = np.concatenate(
(self.radii[: self.n1], [self.initial_radii[self.n1]])
)
rg3 = utils.calculate_rg(combined_radii, n3, self.df, self.kf)
# Heuristic from Fortran: ensure rg3 is not smaller than rg1
# (avoids issues if rg calculation is noisy for small N)
if self.rg1 > 0 and rg3 < self.rg1:
logger.info(
f"Gamma calc: Adjusted rg3 from {rg3:.2e} to match rg1 {self.rg1:.2e}"
)
rg3 = self.rg1
gamma_pc = 0.0
gamma_real = False
try:
term1 = (m3**2) * (rg3**2)
term2 = m3 * (self.m1 * self.rg1**2 + m2 * rg2**2) # rg2 is for monomer
denominator = self.m1 * m2
# Check if radicand is positive and denominator is non-zero
radicand = term1 - term2
if (
radicand > utils.FLOATING_POINT_ERROR
and denominator > utils.FLOATING_POINT_ERROR
): # Use tolerance
gamma_pc = np.sqrt(radicand / denominator)
gamma_real = True
else:
# Keep gamma_real False
logger.debug(
f"Gamma_pc calculation non-real or denominator zero: "
f"n1={self.n1}, m1={self.m1:.2e}, rg1={self.rg1:.2e}, "
f"m2={m2:.2e}, rg2={rg2:.2e}, "
f"m3={m3:.2e}, rg3={rg3:.2e} -> "
f"radicand={radicand:.2e}, denominator={denominator:.2e}"
)
except (ValueError, ZeroDivisionError, OverflowError) as e:
logger.warning(f"Gamma calculation internal failed: {e}")
gamma_real = False
return gamma_real, gamma_pc
def _select_candidates(
self, radius_k: float, gamma_pc: float, gamma_real: bool
) -> tuple[np.ndarray, float]:
"""
Generates the list of candidate particles (indices within 0 to n1-1)
that monomer 'k' could stick to, based on Gamma_pc geometry.
Returns the candidate indices and Rmax (max distance from CM).
"""
candidates = []
r_max_current = 0.0
# If gamma is not real, sticking based on this criterion is impossible.
if not gamma_real:
logger.debug(
"Gamma_pc not real in PCA candidate selection. No candidates selected."
)
return np.array([], dtype=int), self.r_max # Return current r_max
# Rmax needs to be tracked based on *all* particles in the cluster
if self.n1 > 0:
distances_sq = np.sum((self.coords[: self.n1] - self.cm) ** 2, axis=1)
self.r_max = np.sqrt(np.max(distances_sq)) if distances_sq.size > 0 else 0.0
logger.debug(
f" _select_candidates: Checking N1={self.n1} particles against Gamma_pc={gamma_pc:.4f}, R_k={radius_k:.4f}"
)
for i in range(self.n1): # Iterate through particles 0 to n1-1
dist_sq = np.sum((self.coords[i] - self.cm) ** 2)
dist = np.sqrt(dist_sq)
# r_max_current = max(r_max_current, dist) # Rmax updated above
radius_i = self.radii[i] # Radius of particle 'i' in the cluster
radius_sum = radius_k + radius_i
lower_dist_bound = gamma_pc - radius_sum
upper_dist_bound = gamma_pc + radius_sum
# Fortran conditions (translated):
# 1) (R_k + R_i) <= Gamma_pc
radius_sum_check = radius_sum <= gamma_pc + utils.FLOATING_POINT_ERROR
# 2) dist > (Gamma_pc - R_k - R_i)
lower_bound_check = dist > lower_dist_bound - utils.FLOATING_POINT_ERROR
# 3) dist <= (Gamma_pc + R_k + R_i)
upper_bound_check = dist <= upper_dist_bound + utils.FLOATING_POINT_ERROR
logger.debug(
f" Cand i={i}: Dist={dist:.4f}, R_i={radius_i:.4f} | "
f"Cond1 (Rk+Ri <= G): {radius_sum:.4f} <= {gamma_pc:.4f}? -> {radius_sum_check} | "
f"Cond2 (Dist > G-Rk-Ri): {dist:.4f} > {lower_dist_bound:.4f}? -> {lower_bound_check} | "
f"Cond3 (Dist <= G+Rk+Ri): {dist:.4f} <= {upper_dist_bound:.4f}? -> {upper_bound_check}"
)
if radius_sum_check and lower_bound_check and upper_bound_check:
candidates.append(i)
logger.debug(f" -> Candidate {i} ADDED.")
logger.debug(
f"PCA selecting candidates for radius {radius_k:.2f} (gamma={gamma_pc:.3f}): Found {len(candidates)} candidates from {self.n1} particles."
)
return np.array(candidates, dtype=int), self.r_max # Return updated r_max
def _search_and_select_candidate(
self, k: int, considered_indices: list[int]
) -> tuple[int, float, float, bool, float, np.ndarray]:
"""
Handles the complex logic of selecting a candidate, potentially swapping
monomer 'k' with another if the initial attempt yields no candidates.
Corresponds roughly to the loop calling `Search_list` and `Random_select_list`.
Returns:
tuple: (selected_idx, m2, rg2, gamma_real, gamma_pc, candidate_list)
Returns -1 for selected_idx if no candidate found after all attempts.
candidate_list is the list of indices (0 to n1-1) found for the *final* particle k.
"""
available_monomers = list(range(k, self.N)) # Indices of unprocessed monomers
tried_swaps = {k} # Monomers tried *at position k*
while True:
# --- Try with current monomer k ---
current_k_idx = k # The actual index in the original list being processed
current_k_radius = self.initial_radii[current_k_idx]
current_k_mass = self.initial_mass[current_k_idx]
# Rg of a single sphere = sqrt(3/5) * R => sqrt(0.6) * R
current_k_rg = np.sqrt(0.6) * current_k_radius
gamma_real, gamma_pc = self._gamma_calculation(current_k_mass, current_k_rg)
logger.debug(
f"PCA search k={k}: Radius={current_k_radius:.2f}, Gamma_real={gamma_real}, Gamma_pc={gamma_pc:.4f}"
)
candidates = np.array([], dtype=int)
if gamma_real:
# Rmax is updated inside _select_candidates
candidates, self.r_max = self._select_candidates(
current_k_radius, gamma_pc, gamma_real
)
logger.debug(
f"PCA search k={k}: Found {len(candidates)} candidates: {candidates}"
)
if len(candidates) > 0:
# Select one candidate randomly (will be used as starting point in run loop)
idx_in_candidates = np.random.randint(len(candidates))
selected_initial_candidate = candidates[idx_in_candidates]
logger.debug(
f"PCA search k={k}: Initial candidate {selected_initial_candidate} selected from {len(candidates)} options."
)
# Return the *full list* of candidates found for this k
return (
selected_initial_candidate,
current_k_mass,
current_k_rg,
gamma_real,
gamma_pc,
candidates, # Return the list of all candidates
)
else:
logger.debug(
f"PCA search k={k}: No candidates found or gamma not real. Looking for swap."
)
# --- No candidates: Try swapping k with an untried, available monomer ---
# Find monomers eligible for swapping (not k itself, not already tried at pos k,
# and not already successfully placed in the aggregate)
eligible_for_swap = [
idx
for idx in available_monomers
if idx not in tried_swaps and idx not in considered_indices
]
if not eligible_for_swap:
# No more monomers to swap with
logger.warning(
f"PCA k={k}: No candidates found and no more available monomers to swap with."
)
return (
-1, # Indicate failure
current_k_mass,
current_k_rg,
gamma_real,
gamma_pc,
np.array([], dtype=int), # Empty candidate list
)
# Select a random monomer to swap with k
swap_idx_in_eligible = np.random.randint(len(eligible_for_swap))
swap_target_original_idx = eligible_for_swap[swap_idx_in_eligible]
logger.debug(
f" PCA k={k}: Swapping with monomer original index {swap_target_original_idx}."
)
# Perform the swap in the initial_radii and initial_mass arrays
# Swap particle at index k with particle at swap_target_original_idx
self.initial_radii[k], self.initial_radii[swap_target_original_idx] = (
self.initial_radii[swap_target_original_idx],
self.initial_radii[k],
)
self.initial_mass[k], self.initial_mass[swap_target_original_idx] = (
self.initial_mass[swap_target_original_idx],
self.initial_mass[k],
)
# Note: The particle originally at k is now at swap_target_original_idx
# The particle originally at swap_target_original_idx is now at k
# Mark the monomer *originally* at swap_target_original_idx as having been tried *at position k*
tried_swaps.add(swap_target_original_idx)
# Loop continues, recalculating gamma/candidates with the new monomer now at index k
logger.debug(
f"PCA search k={k}: Swapped monomer from index {swap_target_original_idx} into position {k}. Retrying gamma/candidate search."
)
# The state of self.initial_radii/mass at index k has changed, restart loop
def _sticking_process(
self, k: int, selected_idx: int, gamma_pc: float
) -> tuple[np.ndarray | None, float, np.ndarray, np.ndarray, np.ndarray]:
"""
Calculates the geometric parameters for placing monomer k based on
intersection of two spheres.
Sphere 1: Center = selected particle coords, Radius = R_sel + R_k
Sphere 2: Center = CM of aggregate, Radius = Gamma_pc
Returns:
tuple: (coord_k_initial, theta_a, vec_0, i_vec, j_vec) or (None, ...) if intersection fails.
coord_k_initial is *one* point on the intersection circle.
Other return values define the circle for rotation (_reintento).
"""
if selected_idx < 0 or selected_idx >= self.n1:
logger.error(
f"Sticking process called with invalid selected_idx={selected_idx} (n1={self.n1})"
)
return None, 0.0, np.zeros(4), np.zeros(3), np.zeros(3)
if k < 0 or k >= self.N:
logger.error(f"Sticking process called with invalid k={k} (N={self.N})")
return None, 0.0, np.zeros(4), np.zeros(3), np.zeros(3)
coord_sel = self.coords[selected_idx]
radius_sel = self.radii[selected_idx]
# Use initial radius of particle k before it's placed
radius_k = self.initial_radii[k]
# Define the two spheres for intersection
sphere1_center = coord_sel
sphere1_radius = radius_sel + radius_k
sphere1 = np.concatenate((sphere1_center, [sphere1_radius]))
sphere2_center = self.cm # Use current aggregate CM
sphere2_radius = gamma_pc
sphere2 = np.concatenate((sphere2_center, [sphere2_radius]))
intersection_valid = False
try:
# This utility finds the circle and returns *one* random point on it
x_k, y_k, z_k, theta_a, vec_0, i_vec, j_vec, intersection_valid = (
utils.two_sphere_intersection(sphere1, sphere2)
)
except Exception as e:
logger.error(
f"Error during two_sphere_intersection call: {e}", exc_info=True
)
intersection_valid = False # Ensure it's false on exception
if not intersection_valid:
logger.warning(
f"PCA sticking sphere intersection failed for k={k}, sel={selected_idx}."
)
# Log details to help diagnose intersection failures
dist_centers = np.linalg.norm(sphere1_center - sphere2_center)
radius_sum = sphere1_radius + sphere2_radius
radius_diff = abs(sphere1_radius - sphere2_radius)
logger.warning(
f" Intersection Fail Details: Center1={sphere1_center}, R1={sphere1_radius:.4f} | "
f"Center2={sphere2_center}, R2={sphere2_radius:.4f} | "
f"Dist={dist_centers:.4f}, R1+R2={radius_sum:.4f}, |R1-R2|={radius_diff:.4f}"
)
# Check conditions violated by intersection check in utils
if dist_centers > radius_sum + utils.FLOATING_POINT_ERROR:
logger.warning(" -> Spheres too far apart.")
if dist_centers < radius_diff - utils.FLOATING_POINT_ERROR:
logger.warning(" -> Sphere contained.")
if (
dist_centers < utils.FLOATING_POINT_ERROR
and abs(radius_diff) < utils.FLOATING_POINT_ERROR
):
logger.warning(" -> Spheres coincide.")
return None, 0.0, np.zeros(4), np.zeros(3), np.zeros(3) # Indicate failure
# Return the initial point found and the circle parameters for rotation
coord_k_initial = np.array([x_k, y_k, z_k])
return coord_k_initial, theta_a, vec_0, i_vec, j_vec
# Remove internal overlap check, use utils.calculate_max_overlap_pca
# def _overlap_check(self, k: int) -> float: ...
def _reintento(
self, k: int, vec_0: np.ndarray, i_vec: np.ndarray, j_vec: np.ndarray
) -> tuple[np.ndarray, float]:
"""
Calculates a *new* random point on the intersection circle defined by
vec_0 (center, radius) and basis vectors i_vec, j_vec.
This is used to rotate monomer k to try and resolve overlaps.
Returns:
tuple: (coord_k_new, theta_a_new) - the new coordinates and the angle used.
"""
x0, y0, z0, r0 = vec_0
# If radius of intersection is near zero (touching point case), rotation is meaningless
if r0 < utils.FLOATING_POINT_ERROR:
logger.debug(
f"Reintento k={k}: Intersection radius near zero, no rotation possible."
)
# Return the center of the 'circle' (the touch point)
return np.array([x0, y0, z0]), 0.0
# Generate new random angle
theta_a_new = 2.0 * config.PI * np.random.rand()
# Calculate new position using the circle equation
coord_k_new = np.zeros(3)
coord_k_new = (
np.array([x0, y0, z0])
+ r0 * np.cos(theta_a_new) * i_vec
+ r0 * np.sin(theta_a_new) * j_vec
)
# coord_k_new[0] = (
# x0
# + r0 * np.cos(theta_a_new) * i_vec[0]
# + r0 * np.sin(theta_a_new) * j_vec[0]
# )
# coord_k_new[1] = (
# y0
# + r0 * np.cos(theta_a_new) * i_vec[1]
# + r0 * np.sin(theta_a_new) * j_vec[1]
# )
# coord_k_new[2] = (
# z0
# + r0 * np.cos(theta_a_new) * i_vec[2]
# + r0 * np.sin(theta_a_new) * j_vec[2]
# )
return coord_k_new, theta_a_new
[docs]
def run(self) -> np.ndarray | None:
"""Run the complete PCA process for all N particles.
Sequentially adds particles from index 2 to N-1. For each particle k,
it calculates Gamma_pc, finds potential sticking partners (`candidates`)
in the existing aggregate (0..k-1), potentially swaps particle k with
an unused one if no candidates are found initially.
It then attempts to stick particle k to each candidate partner,
calculating an initial position based on sphere intersections defined
by Gamma_pc. If overlap occurs, it rotates particle k around the
intersection circle (`_reintento`) up to `max_rotations` times.
If a non-overlapping position is found for any candidate, the particle
is successfully added, and aggregate properties are updated. If all
candidates and all rotations fail for a particle k, or if the initial
search/swap fails, the aggregation stops, `not_able_pca` is set True,
and None is returned.
Returns
-------
np.ndarray | None
An Nx4 NumPy array [X, Y, Z, R] of the final aggregate if
successful, otherwise None.
"""
if self.N < 2:
return None
self._first_two_monomers()
considered_indices = list(range(self.n1))
for k in range(self.n1, self.N):
logger.debug(f"--- PCA Step: Aggregating particle k={k} ---")
# --- Outer loop to allow re-searching/swapping if all candidates fail overlap ---
search_attempt = 0
max_search_attempts = (
self.N
) # Limit attempts to prevent infinite loops in edge cases
sticking_successful = False
while not sticking_successful and search_attempt < max_search_attempts:
search_attempt += 1
logger.debug(f"PCA k={k}: Search/Swap Attempt #{search_attempt}")
# --- Perform Search/Swap (as before) ---
search_result = self._search_and_select_candidate(k, considered_indices)
(
initial_candidate_idx,
m2,
rg2,
gamma_real,
gamma_pc,
candidates_list,
) = search_result
# Check if search failed completely (no candidates even after swaps)
if (
initial_candidate_idx < 0 or not gamma_real
): # Don't need len(candidates_list) check here
logger.error(
f"PCA failed Search/Swap for k={k} (Attempt {search_attempt}). No valid gamma/candidates found even after swaps."
)
self.not_able_pca = True
return None # Cannot continue if search itself fails
# Store radius/mass for the current particle at index k
radius_k_current = self.initial_radii[k]
mass_k_current = self.initial_mass[k]
# --- Try Sticking with Found Candidates ---
if len(candidates_list) == 0:
logger.debug(
f"PCA k={k}, Attempt {search_attempt}: Search yielded Gamma but no candidates list. Retrying search/swap."
)
# Force the outer while loop to continue (effectively re-swaps)
# No need to do anything else, the while loop condition handles it
# This case might happen if _select_candidates fails geometrically
# even if gamma was real after a swap.
continue # Go to next iteration of the outer while loop
candidates_to_try = utils.shuffle_array(candidates_list.copy())
logger.debug(
f"PCA k={k}, Attempt {search_attempt}: Trying {len(candidates_to_try)} candidates: {candidates_to_try}"
)
all_candidates_failed_overlap = True # Assume failure until success
for current_selected_idx in candidates_to_try:
logger.debug(
f"PCA k={k}: Trying candidate partner index {current_selected_idx}"
)
stick_result = self._sticking_process(
k, current_selected_idx, gamma_pc
)
if stick_result is None or stick_result[0] is None:
logger.debug(
f" PCA k={k}, cand={current_selected_idx}: Sticking geometry failed."
)
continue # Try next candidate
coord_k_initial, theta_a, vec_0, i_vec, j_vec = stick_result
self.coords[k] = coord_k_initial
self.radii[k] = radius_k_current
self.mass[k] = mass_k_current
cov_max = utils.calculate_max_overlap_pca(
self.coords[: self.n1],
self.radii[: self.n1],
self.coords[k],
self.radii[k],
)
logger.debug(
f" PCA k={k}, cand={current_selected_idx}: Initial overlap = {cov_max:.4e}"
)
intento = 0
max_rotations = 360
while cov_max > self.tol_ov and intento < max_rotations:
intento += 1
coord_k_new, theta_a_new = self._reintento(
k, vec_0, i_vec, j_vec
)
self.coords[k] = coord_k_new
cov_max = utils.calculate_max_overlap_pca(
self.coords[: self.n1],
self.radii[: self.n1],
self.coords[k],
self.radii[k],
)
# Add detailed trace log here if needed (as before)
if logger.isEnabledFor(TRACE_LEVEL_NUM): # TRACE level
ov_details = []
for idx_agg in range(self.n1):
if idx_agg == current_selected_idx:
continue # Skip self-check with candidate? No needed here.
ov_agg = 1 - (
np.linalg.norm(
self.coords[k] - self.coords[idx_agg]
)
/ (self.radii[k] + self.radii[idx_agg])
)
ov_details.append(f"vs{idx_agg}:{ov_agg:.2e}")
logger.log(
TRACE_LEVEL_NUM,
f" PCA k={k}, cand={current_selected_idx}, Rot {intento}: Overlap = {cov_max:.4e} ({', '.join(ov_details)})",
)
if cov_max <= self.tol_ov:
logger.debug(
f"PCA k={k}: Sticking successful with cand {current_selected_idx} after {intento} rotations."
)
sticking_successful = True # Set flag for outer loop
all_candidates_failed_overlap = (
False # Mark success for this attempt
)
break # Exit the 'for current_selected_idx' loop
else:
logger.debug(
f" PCA k={k}, cand={current_selected_idx}: Failed overlap after {max_rotations} rotations."
)
# Continue to the next candidate in candidates_to_try
# --- After trying all candidates for this search attempt ---
if all_candidates_failed_overlap:
logger.warning(
f"PCA k={k}, Attempt {search_attempt}: All {len(candidates_to_try)} candidates failed overlap check. Retrying search/swap..."
)
# Reset temporary placement before potentially swapping particle k
self.coords[k] = 0.0
self.radii[k] = 0.0
self.mass[k] = 0.0
# The outer `while not sticking_successful` loop will continue
# else: sticking_successful is True, outer while loop will exit
# --- After the outer while loop ---
if not sticking_successful:
# This happens if max_search_attempts was reached
logger.error(
f"PCA failed at k={k}. Could not find non-overlapping position "
f"after {max_search_attempts} search/swap attempts."
)
self.not_able_pca = True
return None # Critical failure
# --- Update aggregate properties (only if sticking was successful) ---
self.n1 += 1
m_old = self.m1
self.m1 += self.mass[k] # Use mass that was set during successful attempt
if self.m1 > utils.FLOATING_POINT_ERROR:
self.cm = (self.cm * m_old + self.coords[k] * self.mass[k]) / self.m1
else:
self.cm = np.mean(self.coords[: self.n1], axis=0)
self.rg1 = utils.calculate_rg(
self.radii[: self.n1], self.n1, self.df, self.kf
)
considered_indices.append(k)
logger.debug(
f"--- PCA Step: Successfully added particle k={k}. Aggregate size n1={self.n1} ---"
)
# --- End of k loop ---
# ... (final checks and return as before) ...
if self.not_able_pca:
return None
final_data = np.hstack(
(self.coords[: self.N], self.radii[: self.N].reshape(-1, 1))
)
if np.any(np.isnan(final_data)):
logger.error("NaN detected in final PCA data.")
self.not_able_pca = True
return None
logger.info(f"PCA run completed successfully for N={self.N} particles.")
return final_data