Source code for src.space

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Simple interaction model with several bodies.

:module: space
:author: Le Bars, Yoann

This module is part of the pure Python benchmark.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License  as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version. See file `LICENSE` or go to:

https://www.gnu.org/licenses/gpl-3.0.html
"""


# Defines the public names of this module.
__all__ = ['Space']


from typing import Final
import time
import math
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count
from scipy.spatial import cKDTree
import numpy as np
from PySide6.QtCore import qWarning, qDebug
from src.body import Bodies, BodyProxy
from src import constants


@dataclass
class CollisionContext:
    """
    A data container to hold all relevant information for a single collision event.
    This simplifies passing collision data between functions.
    """
    b1: BodyProxy
    b2: BodyProxy
    n_vec: np.ndarray
    r1_vec: np.ndarray
    r2_vec: np.ndarray
    v_rel: np.ndarray
    overlap: float

    @classmethod
    def from_bodies(cls, b1: BodyProxy, b2: BodyProxy) -> 'CollisionContext | None':
        """
        Factory method to create a CollisionContext if two bodies are colliding.

        :param BodyProxy b1: First body.
        :param BodyProxy b2: Second body.
        :returns: A CollisionContext object if a collision occurs, otherwise None.
        :rtype: CollisionContext | None
        """
        d_vec = b2.x - b1.x
        dist_sq = np.dot(d_vec, d_vec)
        r_sum = b1.r + b2.r
        if dist_sq >= r_sum**2 or dist_sq == 0:
            return None

        dist = math.sqrt(dist_sq)
        n_vec = d_vec / dist
        r1_vec = -b1.r * n_vec
        r2_vec = b2.r * n_vec
        v_rel = (b2.v + np.cross(b2.omega, r2_vec)) - \
                (b1.v + np.cross(b1.omega, r1_vec))
        overlap = (b1.r + b2.r) - dist

        return cls(b1, b2, n_vec, r1_vec, r2_vec, v_rel, overlap)


def _calculate_force_chunk(args: tuple) -> np.ndarray:
    """
    Worker function to calculate gravitational forces for a subset of bodies using a fully
    vectorized approach. This is a top-level function as required by `multiprocessing`.

    The function calculates the gravitational acceleration exerted by all bodies in the
    simulation on a specific subset (chunk) of bodies.

    :param tuple args: A tuple containing the following arguments:

                       - **indices** (*slice*): A slice object defining the subset of bodies
                         to calculate forces for.
                       - **positions** (*np.ndarray*): Array of positions for all bodies.
                         Shape: `(num_bodies, 3)`.
                       - **masses** (*np.ndarray*): Array of masses for all bodies.
                         Shape: `(num_bodies,)`.
                       - **G** (*float*): The universal gravitational constant.
                       - **epsilon_sq** (*float*): The squared softening factor to prevent
                         singularities.

    :returns: The calculated accelerations for the given indices.
    :rtype: np.ndarray
    """

    # Unpack arguments for clarity.
    indices, positions, masses, G, epsilon_sq = args            # pylint: disable=invalid-name

    # `diffs`: 3D array of position difference vectors.
    # Shape: (num_bodies_in_chunk, total_num_bodies, 3)
    diffs = positions - positions[indices, np.newaxis, :]
    # `dists_sq`: 2D array of squared distances.
    # We use `out=...` with `einsum` to avoid creating an intermediate `diffs**2` array.
    # Shape: (num_bodies_in_chunk, total_num_bodies)
    dists_sq = np.einsum('ijk,ijk->ij', diffs, diffs)

    # Apply softening factor to avoid division by zero for bodies at the same position.
    # This also prevents taking the square root of zero.
    dists_sq[dists_sq < epsilon_sq] = epsilon_sq

    # Calculate 1 / distance^3 in-place to save memory.
    dists_sq **= -1.5

    # `force_magnitudes` now contains `G * m_j / d_ij^3` for each pair.
    force_magnitudes = G * masses * dists_sq

    # Use einsum to calculate the sum of acceleration vectors. This is equivalent to
    # `np.sum(force_magnitudes[:, :, np.newaxis] * diffs, axis=1)` but can be faster.
    return np.einsum('ij,ijk->ik', force_magnitudes, diffs)


# pylint: disable=too-many-instance-attributes
[docs] class Space: """ Class describing a space in which move several bodies. :ivar float __dt: Model time step (in s). :ivar float __previous_dt: Previous model time step (in s). :ivar float __G: Universal gravity constant (in m³/kg/s²). :ivar float __epsilon: Computing precision. :ivar Bodies _bodies: The main SoA container. :ivar list[list[int]] _collision_graph: Graph of collisions. :ivar list[int] _colours: Colours used for spatialised collision handling. :ivar bool __diagnostics: Whether to enable performance profiling. :ivar Pool _pool: A pool of worker processes for parallel computation. :ivar float _last_total_energy: The total energy from the previous logged step. :ivar int _current_iteration: Helper for detailed logging. :ivar bool _logged_collision_details: Helper for detailed logging. """ __dt: float __previous_dt: float __G: float __epsilon: float _bodies: Bodies # Adjacency list for the collision graph _collision_graph: list[list[int]] _colours: list[int] # Node colours for graph colouring _pool: Pool _last_total_energy: float _prof_integration: float _prof_collision_detection: float _prof_collision_response: float _prof_force_calculation: float _current_iteration: int _logged_collision_details: bool def _initialize_bodies(self, n: int, dens: float, seed: int) -> None: # pylint: disable=too-many-locals """ Actual initialisation of the bodies. :param int n: Number of bodies. :param float dens: Bodies density. :param int seed: Seed for random number generation. """ # Use NumPy’s random generator for vectorized operations rng = np.random.default_rng(seed if seed != 0 else None) # Generate masses and radii for all bodies at once masses = rng.uniform(25.0, 75.0, n) radii = masses * dens # Generate initial velocities for all bodies at once velocities = rng.uniform(-5.0, 5.0, (n, 3)) # Maximum number of tries to position a body without overlap max_retries: Final[int] = 100 # Place bodies one by one, but with an optimized overlap check for i in range(n): # Mass, radius, and initial velocity for the current body. m, r, v0 = masses[i], radii[i], velocities[i] # Current body initial position. x0: np.ndarray placed = False for _ in range(max_retries): # Generate a new tentative position x0_try = rng.uniform(-100.0, 100.0, 3) x0 = x0_try # Store the last attempt # If it is the first body, no need to check for overlap if i == 0: x0 = x0_try placed = True break # Vectorized overlap check against all previously placed bodies. # This is much faster than iterating through each placed body. # `diffs` is an array of vectors from each existing body to the new # tentative position. NumPy broadcasting handles this efficiently. # Shape: (i, 3) where i is the number of bodies already placed. diffs = x0_try - self._bodies.x[:i] # `dists_sq` is an array of squared distances to each existing body. dists_sq = np.sum(diffs**2, axis=1) # `r_sums_sq` is an array of the squared sum of radii (the minimum # distance allowed to avoid overlap). r_sums_sq = (r + self._bodies.r[:i])**2 # `np.any()` checks if any squared distance is less than the required # minimum squared distance. If no overlaps are found, we accept the # position and break the retry loop. if not np.any(dists_sq < r_sums_sq): placed = True break if not placed: qWarning( f"Could not place body {i} without overlap after " f"{max_retries} attempts. Placing it at the last tried position, " "which may cause initial instability." ) self._bodies.emplace_back(m, r, x0, v0) def __init__(self, args: 'CmdLineArgs') -> None: """ Class initialiser. :param CmdLineArgs args: The command-line arguments containing simulation parameters. """ self.__dt = args.dt self.__previous_dt = args.dt self.__G = args.universal_g self.__epsilon = args.epsilon if args.epsilon > 0. else constants.EPSILON self._bodies = Bodies(args.n_bodies) self._current_iteration = 0 self._logged_collision_details = False self._collision_graph = [[] for _ in range(args.n_bodies)] self._colours = [-1] * args.n_bodies self._last_total_energy = 0.0 # --- Multiprocessing Pool Initialization --- # R1732 is a false positive here. The pool's lifecycle is tied to the Space object's # lifetime, not a smaller 'with' block. Cleanup is handled in the cleanup() method. self._pool = Pool(processes=cpu_count() # pylint: disable=consider-using-with ) self._initialize_bodies(args.n_bodies, args.dens, args.seed) self._prof_integration = 0.0 self._prof_collision_detection = 0.0 self._prof_collision_response = 0.0 self._prof_force_calculation = 0.0 # Initialize with a very high value to avoid false positive on first check self._last_total_energy = float('inf') # --- Adaptive Time-step Warm-up --- # The simulation starts with zero acceleration. If we used the default `dt`, the first # step might be too large if bodies are close, causing an instability. To prevent this, # we perform a few "warm-up" iterations with a very small `dt`. # This allows the system to calculate initial gravitational forces and accelerations, # and lets the adaptive time-stepper converge on a safe, stable `dt` before the # actual simulation begins. # We use a tiny initial dt to kickstart the process. self.__dt = self.__previous_dt = self.__epsilon # Run a few steps of the dynamics calculation. for _ in range(5): # This computes forces and updates `self.__dt` via the adaptive algorithm. self.compute_dynamics(0)
[docs] def cleanup(self) -> None: """ Clean up resources, such as closing the multiprocessing pool. """ # Close the pool to prevent any new tasks from being submitted. self._pool.close() # Wait for all worker processes to complete their tasks and exit. self._pool.join()
[docs] def print_profiling_report(self) -> None: """ Prints a formatted report of the time spent in different stages of the simulation if diagnostics were enabled. """ profiling_data = { "Integration": self._prof_integration, "Force Calculation": self._prof_force_calculation, "Collision Detection": self._prof_collision_detection, "Collision Response": self._prof_collision_response, } total_time = sum(profiling_data.values()) if total_time < constants.EPSILON: qDebug("\n--- Profiling Report (No data) ---") return report = "\n--- Profiling Report ---\n" for name, time_spent in profiling_data.items(): percentage = time_spent / total_time * 100 report += f"{name:<22} {time_spent:9.4f}s ({percentage:5.1f}%)\n" report += "-" * 40 + "\n" report += f"{'Total Instrumented Time':<22} {total_time:9.4f}s" qDebug(report)
def _calculate_energies(self) -> tuple[float, float, float]: # pylint: disable=too-many-locals """ Computes the total translational kinetic, rotational kinetic, and gravitational potential energy of the system using vectorized calculations. :returns: A tuple containing (translational KE, rotational KE, potential energy). :rtype: tuple[float, float, float] """ # Number of bodies in the space. num_bodies = len(self._bodies) if num_bodies == 0: return 0.0, 0.0, 0.0 # --- Kinetic Energies (Vectorized) --- # Translational KE = 0.5 * m * v^2 # We use einsum for an efficient, single-pass calculation of sum(m_i * v_i^2). ke_trans = 0.5 * np.einsum('i,ij,ij->', self._bodies.m, self._bodies.v, self._bodies.v) # Rotational KE = 0.5 * I * ω^2 # We only consider bodies with a valid moment of inertia (i_inv > 0). valid_inertia_mask = self._bodies.i_inv > 0 inertia = np.zeros_like(self._bodies.i_inv) inertia[valid_inertia_mask] = 1.0 / \ self._bodies.i_inv[valid_inertia_mask] omega_sq = np.sum(self._bodies.omega**2, axis=1) ke_rot = 0.5 * np.sum(inertia * omega_sq) # --- Potential Energy (Vectorized) --- # Potential Energy U = -G * m1 * m2 / r # We calculate this for all unique pairs of bodies. # Get upper triangle indices to select unique pairs (i, j) where i < j. i_indices, j_indices = np.triu_indices(num_bodies, k=1) # Calculate distance vectors and squared distances for all pairs at once. dist_vecs = self._bodies.x[j_indices] - self._bodies.x[i_indices] dist_sq = np.sum(dist_vecs**2, axis=1) # Filter out pairs that are too close to avoid division by zero. valid_dist_mask = dist_sq > self.__epsilon**2 dist = np.sqrt(dist_sq[valid_dist_mask]) # Calculate potential energy only for valid pairs and sum them up. m1 = self._bodies.m[i_indices][valid_dist_mask] m2 = self._bodies.m[j_indices][valid_dist_mask] potential = -self.__G * np.sum((m1 * m2) / dist) return ke_trans, ke_rot, potential def _resolve_interpenetration(self, b1: BodyProxy, b2: BodyProxy, overlap: float, n_vec: np.ndarray) -> None: """ Resolves interpenetration by moving bodies apart along the collision normal. **Why this is important for stability:** Due to discrete time steps in the simulation, bodies can slightly overlap (interpenetrate) before a collision is detected. If this overlap is not corrected, bodies can sink into each other, causing the collision response to apply massive, unrealistic forces in subsequent frames. This leads to numerical instability and "explosions" where energy is not conserved. This method applies a direct positional correction to move the bodies so they are just touching. This prevents the sinking problem and ensures that the impulse-based velocity response is calculated from a valid, non-overlapping state, which is crucial for a stable simulation. :param BodyProxy b1: First body. :param BodyProxy b2: Second body. :param float overlap: The amount of overlap. :param np.ndarray n_vec: The collision normal vector. """ # The total correction is distributed between the two bodies based on their inverse mass. # This means lighter bodies move more, and immovable bodies (inv_m=0) don't move at all. total_inv_mass = b1.inv_m + b2.inv_m if total_inv_mass < self.__epsilon: return # A small percentage of the overlap is corrected to avoid jittering. percent = 0.8 # 80% correction correction = (overlap * percent / total_inv_mass) * n_vec if b1.inv_m > 0: self._bodies.x[b1._index] -= correction * \ b1.inv_m # pylint: disable=protected-access if b2.inv_m > 0: self._bodies.x[b2._index] += correction * \ b2.inv_m # pylint: disable=protected-access def _get_effective_mass(self, ctx: CollisionContext, direction_vec: np.ndarray) -> float: """ Calculates the effective mass of two colliding bodies in a given direction. The "effective mass" represents the system's total resistance to acceleration along the specified direction, accounting for both linear and rotational inertia. :param CollisionContext ctx: The collision context. :param np.ndarray direction_vec: The direction vector (e.g., normal or tangent). :returns: The scalar effective mass. :rtype: float """ inv_mass_sum = ctx.b1.inv_m + ctx.b2.inv_m # Rotational inertia contribution for body 1: (r1 x n)^2 / I1 term1 = np.dot(np.cross(ctx.r1_vec, direction_vec), np.cross(ctx.r1_vec, direction_vec)) * ctx.b1.i_inv # Rotational inertia contribution for body 2: (r2 x n)^2 / I2 term2 = np.dot(np.cross(ctx.r2_vec, direction_vec), np.cross(ctx.r2_vec, direction_vec)) * ctx.b2.i_inv return 1.0 / (inv_mass_sum + term1 + term2) def _apply_restitution_impulse(self, ctx: CollisionContext) -> tuple[np.ndarray, float]: """ Calculates and applies the normal impulse (restitution) for a collision. :param CollisionContext ctx: The collision context. :returns: A tuple containing the impulse vector and its magnitude. :rtype: tuple[np.ndarray, float] """ v_rel_normal = np.dot(ctx.v_rel, ctx.n_vec) if v_rel_normal > 0: # Bodies are moving apart, no impulse needed. return np.zeros(3), 0.0 effective_mass_n = self._get_effective_mass(ctx, ctx.n_vec) jn = -(1.0 + constants.COEFF_RESTITUTION) * \ v_rel_normal * effective_mass_n jn = max(0, jn) return jn * ctx.n_vec, jn def _apply_friction_impulse(self, ctx: CollisionContext, jn: float, impulse_n: np.ndarray) -> np.ndarray: """ Calculates and applies the friction impulse for a collision. :param CollisionContext ctx: The collision context. :param float jn: The magnitude of the normal impulse, used for the friction limit. :param np.ndarray impulse_n: The normal impulse vector. :returns: The friction impulse vector. :rtype: np.ndarray """ # Calculate the relative velocity in the tangent plane. v_t = ctx.v_rel - np.dot(ctx.v_rel, ctx.n_vec) * ctx.n_vec v_t_norm = np.linalg.norm(v_t) # If there's no tangential velocity, there's no friction. if v_t_norm < self.__epsilon: return np.zeros(3) t_vec = v_t / v_t_norm effective_mass_t = self._get_effective_mass(ctx, t_vec) # Calculate the impulse required to stop tangential motion. jt = -np.dot(ctx.v_rel, t_vec) * effective_mass_t # Clamp the friction impulse to the friction cone (Coulomb's law). static_friction_limit = constants.COEFF_STATIC_FRICTION * jn jt = np.clip(jt, -static_friction_limit, static_friction_limit) return jt * t_vec def _handle_collision(self, b1: BodyProxy, b2: BodyProxy) -> None: """ Handle a collision between two bodies. :param BodyProxy b1: First body. :param BodyProxy b2: Second body. """ ctx = CollisionContext.from_bodies(b1, b2) if not ctx: return self._resolve_interpenetration(ctx.b1, ctx.b2, ctx.overlap, ctx.n_vec) impulse_n, jn = self._apply_restitution_impulse(ctx) if jn > 0: impulse_t = self._apply_friction_impulse(ctx, jn, impulse_n) total_impulse = impulse_n + impulse_t ctx.b1.apply_impulse(-total_impulse, ctx.r1_vec, self.__epsilon) ctx.b2.apply_impulse(total_impulse, ctx.r2_vec, self.__epsilon) def _color_graph(self) -> int: """ Implementation of a greedy graph colouring algorithm to partition the collision graph. This is a crucial step for safe collision resolution. By assigning a "color" to each body such that no two adjacent bodies (i.e., two bodies that are colliding) share the same colour, we can process collisions in stages. **Why this is useful for parallelism (or safe sequential processing):** All bodies of the same colour form an "independent set," meaning none of them are colliding with each other. Therefore, all collisions involving bodies of a specific colour can be processed simultaneously (or in a single sequential pass) without causing race conditions or conflicting updates. For example, when processing "color 0" bodies, we can handle collisions (0, 5) and (1, 8) at the same time, because we know that body 0 and body 1 are not colliding with each other. The algorithm iterates through each body, checks the colors of its neighbors in the collision graph, and assigns the smallest available colour to the current body. :returns: The total number of colours used (k), which corresponds to the number of sequential stages needed for collision resolution. :rtype: int """ # Number of bodies in the space. num_bodies: Final[int] = len(self._bodies) # Initialise all colours to -1 (uncoloured). self._colours = [-1] * num_bodies # Keep track of the highest colour index used so far. max_colour = 0 # Iterate through each body (node) in the graph. for i, neighbors in enumerate(self._collision_graph): # `used_colours` is a boolean array to mark which colours are already taken by # the neighbours of the current body `i`. The size is `max_colour + 2` to have # indices from 0 to `max_colour` and one extra slot for the next potential new colour. used_colours = [False] * (max_colour + 2) for neighbor_idx in neighbors: if self._colours[neighbor_idx] != -1: # Mark the neighbour’s colour as used. used_colours[self._colours[neighbor_idx]] = True # Find the first available colour (the smallest integer not used by neighbours). colour = 0 while colour < len(used_colours) and used_colours[colour]: colour += 1 # Assign the found colour to the current body. self._colours[i] = colour if colour > max_colour: max_colour = max(max_colour, colour) return max_colour + 1 def _log_system_energy(self, iteration: int) -> None: """ Logs the system's energy at specified intervals if diagnostics are enabled. :param int iteration: Current iteration number. """ # This diagnostic runs periodically to check for energy conservation, a key indicator of # simulation stability. It should only read data, not modify the simulation state. if not (iteration > 0 and iteration % 100 == 0): return ke_trans, ke_rot, potential = self._calculate_energies() total_ke = ke_trans + ke_rot current_total_energy = total_ke + potential qDebug(f"\n--- Iteration {iteration}: System Energy ---") qDebug(f" Translational KE: {ke_trans:12.4e}") qDebug(f" Rotational KE: {ke_rot:12.4e}") qDebug(f" Potential Energy: {potential:12.4e}") qDebug(f" Total Energy: {current_total_energy:12.4e}") qDebug(f" Time Step (dt): {self.__previous_dt:12.4e}") # A significant increase in total energy indicates a numerical instability, often caused # by the collision response. We log details to help diagnose the issue. # We allow for a small tolerance to account for floating-point inaccuracies. if current_total_energy > self._last_total_energy + self.__epsilon: warning_msg = "="*40 + "\n" warning_msg += (f" Total energy increased by " f"{current_total_energy - self._last_total_energy:.4e}\n") warning_msg += "="*40 qWarning(warning_msg) # The collision graph holds pairs from the *previous* step, which is what we want # to inspect as the likely cause of the energy increase. colliding_pairs = [] for i, neighbors in enumerate(self._collision_graph): for j in neighbors: if i < j: colliding_pairs.append((i, j)) if colliding_pairs: qWarning(" State of bodies from previous step's collision graph:") for i, j in colliding_pairs: b1, b2 = self._bodies[i], self._bodies[j] qWarning(f" Pair ({i}, {j}):") qWarning(f" Body {i}: v={b1.v}, omega={b1.omega}") qWarning(f" Body {j}: v={b2.v}, omega={b2.omega}") self._last_total_energy = current_total_energy def _compute_gravity(self, num_bodies: int) -> None: """ Computes gravitational forces for all bodies. :param int num_bodies: Number of bodies to consider. """ if num_bodies > 0: # Positions of all bodies. positions = self._bodies.x[:num_bodies] # Masses of all bodies. masses = self._bodies.m[:num_bodies] # Squared softening factor to avoid singularities. epsilon_sq = self.__epsilon**2 # Create chunks of work for each process # pylint: disable=protected-access # Size of each chunk of work for the multiprocessing pool. chunk_size = (num_bodies + self._pool._processes - 1) // self._pool._processes # List of tuples, where each tuple contains the arguments for `_calculate_force_chunk`. chunks = [ (slice(i, i + chunk_size), positions, masses, self.__G, epsilon_sq) for i in range(0, num_bodies, chunk_size) ] # List of acceleration arrays returned by the worker processes. results = self._pool.map(_calculate_force_chunk, chunks) self._bodies.a[:num_bodies] = np.vstack(results) def _build_collision_graph(self, num_bodies: int) -> None: # pylint: disable=too-many-locals """ Builds the collision graph using a two-stage broad-phase/narrow-phase strategy to efficiently detect all colliding pairs. :param int num_bodies: The number of bodies in the simulation. """ if num_bodies < 2: return # 1. Clear previous collision graph data. for neighbors in self._collision_graph: neighbors.clear() positions = self._bodies.x[:num_bodies] radii = self._bodies.r[:num_bodies] # --- Broad-Phase Collision Detection --- # The goal of the broad phase is to quickly eliminate pairs of bodies that are # definitely not colliding, creating a smaller list of "candidate" pairs for the # more expensive narrow-phase check. We use a k-d tree. # 1. Build two k-d trees on the body positions for efficient spatial queries. # Using two trees is a common pattern for this type of query. tree = cKDTree(positions) # 2. Query the k-d tree to find all potentially colliding pairs. # `query_pairs` finds all pairs of points whose distance is at most `r`. # We set `r` to the diameter of the largest possible body to ensure we don't # miss any potential collisions. `p=2.0` specifies Euclidean distance. max_radius = np.max(radii) if num_bodies > 0 else 0 # This returns a set of tuples, which we convert to a list of lists for processing. # We convert the set to a list for easier indexing. candidate_pairs = list(tree.query_pairs(r=max_radius * 2, p=2.0)) # --- Narrow-Phase Collision Detection --- # For the candidate pairs from the broad phase, perform a precise intersection test. # This is done in a vectorized manner for performance. if candidate_pairs: # Extract the indices of the first and second body in each pair. # `query_pairs` guarantees i < j for each pair (i, j). indices_i, indices_j = np.array(candidate_pairs).T # Perform the narrow-phase check for all candidate pairs at once. # 1. Calculate the vector difference between the positions of each pair. diffs = positions[indices_j] - positions[indices_i] # 2. Calculate the squared distance for each pair. dists_sq = np.sum(diffs**2, axis=1) # 3. Calculate the squared sum of radii for each pair. r_sums_sq = (radii[indices_i] + radii[indices_j])**2 # 4. Find the pairs that are actually colliding. colliding_mask = dists_sq < r_sums_sq for i, j in np.array(candidate_pairs)[colliding_mask]: # If they are colliding, add an e self._collision_graph[i].append(j) self._collision_graph[j].append(i) def _resolve_all_collisions(self, num_bodies: int) -> None: """ Resolves all collisions for the current step by colouring the collision graph and handling collisions in parallelizable stages. :param int num_bodies: The number of bodies in the simulation. """ # 1. Colour the collision graph to partition colliding bodies into independent sets. # This returns the total number of colours (k) needed. num_colours: Final[int] = self._color_graph() # 2. Process collisions in k stages, one for each colour. # This loop is the key to safe collision resolution. By handling all collisions # for one colour at a time, we ensure that we never simultaneously process two # collisions that share a body (e.g., A-B and A-C), which would lead to incorrect # results and race conditions. for c in range(num_colours): # In this stage, we find all bodies `i` that have the current colour `c`. for i in range(num_bodies): if self._colours[i] != c: continue # For each body `i` of colour `c`, we resolve its collisions with all its # neighbours `j`. Because of the graph colouring, we know that `_colours[j]` # will never be equal to `c`. for j in self._collision_graph[i]: # The `i < j` check ensures that we handle each collision pair only once. if i < j: self._handle_collision( self._bodies[i], self._bodies[j])
[docs] def compute_dynamics(self, iteration: int) -> None: """ Computes one step of the simulation, including dynamics and collisions. :param int iteration: Current iteration. """ # ====================================================================================== # --- 1. SETUP & LOGGING --- # ====================================================================================== self._current_iteration = iteration # Store iteration for detailed logging self._logged_collision_details = False # Reset log flag each frame self._log_system_energy(iteration) # Number of bodies in the space. num_bodies: Final[int] = len(self._bodies) # ====================================================================================== # --- 2. ADAPTIVE TIME-STEPPING --- # ====================================================================================== # Calculate a stable time step `dt` for the current frame based on the maximum # acceleration from the *previous* frame. # --- Adaptive Time Stepping --- # The time step for the current frame is calculated based on the state of the system # at the end of the *previous* frame. This makes the time step responsive to the # simulation's dynamics, ensuring stability during high-acceleration events (like # close encounters or collisions) while allowing for efficiency when bodies are far apart. max_a_sq = 0.0 if num_bodies > 0: # Find the maximum squared acceleration among all bodies from the previous step. max_a_sq = max(np.dot(body.a, body.a) for body in self._bodies) target_dt: float if max_a_sq > self.__epsilon**2: # Heuristic for a stable time step: dt should be small enough that a body with the # maximum acceleration does not travel more than a small target distance (TARGET_DX). # The formula is derived from d = 0.5*a*t^2 => t = sqrt(2*d/a). We use a simplified # version: dt = TARGET_DX / sqrt(max_a_sq). The time step is capped by MAX_DT. target_dt = min(constants.MAX_DT, constants.TARGET_DX / math.sqrt(max_a_sq)) else: # If accelerations are negligible, use the maximum allowed time step. target_dt = constants.MAX_DT # Smooth the change in dt using an exponential moving average (a low-pass filter). # This prevents rapid, large oscillations in the time step, which could introduce # its own numerical instability. We move a fraction of the way towards the target dt. damping_factor = 0.1 self.__dt = self.__dt * (1 - damping_factor) + \ target_dt * damping_factor # ====================================================================================== # --- 3. VELOCITY VERLET INTEGRATION --- # ====================================================================================== # The Velocity Verlet algorithm is split into two parts around the force calculation. # This structure provides better numerical stability than a simple Euler integration. start_time = time.perf_counter() # --- Part 1: Update positions and half-step velocities --- # To improve energy conservation with an adaptive timestep, we use a symmetric approach: # - The position update uses the time step from the previous frame (`dt_pos`). # - The velocity half-step uses the time step for the current frame (`dt`). self._bodies.integrate_part1(self.__dt, self.__previous_dt) self._prof_integration += time.perf_counter() - start_time # ====================================================================================== # --- 4. FORCE CALCULATION & COLLISION HANDLING --- # ====================================================================================== # Now that bodies are in their new positions, we calculate all forces and handle # interactions. start_time = time.perf_counter() # Find all pairs of colliding bodies. self._build_collision_graph(num_bodies) self._prof_collision_detection += time.perf_counter() - start_time start_time = time.perf_counter() # --- Collision Response --- # Resolve the detected collisions by applying impulses and positional corrections. self._resolve_all_collisions(num_bodies) self._prof_collision_response += time.perf_counter() - start_time start_time = time.perf_counter() # Reset accelerations self._bodies.a.fill(0) self._bodies.alpha.fill(0) # --- Gravitational Forces --- # Calculate the gravitational acceleration on each body. self._compute_gravity(num_bodies) self._prof_force_calculation += time.perf_counter() - start_time # --- Part 2: Complete the velocity integration for the CURRENT step --- self._bodies.integrate_part2(self.__dt) # ====================================================================================== # --- 5. CLEANUP --- # ====================================================================================== # Store the dt we just used for this frame's integration for the energy log. self.__previous_dt = self.__dt
@property def bodies(self) -> list[BodyProxy]: """ Provides an AoS-like list of body proxies for the display controller. :returns: List of body proxies. :rtype: list[BodyProxy] """ return [self._bodies[i] for i in range(len(self._bodies))]