#!/usr/bin/env python3
# pylint: disable=too-many-lines
# -*- coding: utf-8 -*-
"""
The core physics engine of the simulation.
This module provides the `Space` class, which manages the simulation loop,
including gravity calculations, collision detection, and collision response for
all bodies.
:module: space
:author: Le Bars, Yoann
"""
# Defines the public names of this module.
__all__ = ['Space', 'CollisionContext']
from typing import Final
import time
import math
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count
from scipy.spatial import cKDTree
import numpy as np
from PySide6.QtCore import qWarning, qDebug
from src.body import Bodies, BodyProxy
from src import constants
from src.config import SimulationConfig
@dataclass
class _ProfilingData:
"""A container for performance profiling timers."""
integration: float = 0.0
collision_detection: float = 0.0
collision_response: float = 0.0
force_calculation: float = 0.0
def total(self) -> float:
"""Returns the total instrumented time."""
return (self.integration + self.collision_detection + self.collision_response
+ self.force_calculation)
[docs]
@dataclass
class CollisionContext:
"""
A data container to hold all relevant information for a single collision event.
This simplifies passing collision data between functions.
:ivar BodyProxy b1: First body.
:ivar BodyProxy b2: Second body.
:ivar np.ndarray n_vec: The collision normal vector.
:ivar np.ndarray r1_vec: The relative position vector from body 1.
:ivar np.ndarray r2_vec: The relative position vector from body 2.
:ivar np.ndarray v_rel: The relative velocity vector.
:ivar float overlap: The amount of overlap.
"""
b1: BodyProxy
b2: BodyProxy
n_vec: np.ndarray
r1_vec: np.ndarray
r2_vec: np.ndarray
v_rel: np.ndarray
overlap: float
[docs]
@classmethod
def from_bodies(cls, b1: BodyProxy, b2: BodyProxy) -> 'CollisionContext | None':
"""
Factory method to create a CollisionContext if two bodies are colliding.
:param BodyProxy b1: First body.
:param BodyProxy b2: Second body.
:returns: A CollisionContext object if a collision occurs, otherwise None.
:rtype: CollisionContext | None
"""
d_vec = b2.x - b1.x
dist_sq = np.dot(d_vec, d_vec)
r_sum = b1.r + b2.r
if dist_sq >= r_sum**2 or dist_sq == 0:
return None
dist = math.sqrt(dist_sq)
n_vec = d_vec / dist
# r1_vec points from the centre of body 1 to the contact point.
r1_vec = b1.r * n_vec
# r2_vec points from the centre of body 2 to the contact point.
r2_vec = -b2.r * n_vec
v_rel = (b2.v + np.cross(b2.omega, r2_vec)) - \
(b1.v + np.cross(b1.omega, r1_vec))
overlap = (b1.r + b2.r) - dist
return cls(b1, b2, n_vec, r1_vec, r2_vec, v_rel, overlap)
class WorkerState:
"""
A simple class-based namespace to hold data for worker processes.
Using a class avoids the need for `global` statements, which Pylint discourages. This state is
initialised once per worker.
"""
bodies: Bodies | None = None
config: SimulationConfig | None = None
# A single instance of the state for each worker process.
_worker_state = WorkerState()
def _calculate_force_chunk(indices: slice, G: float, epsilon_sq: float) -> np.ndarray:
"""
Worker function to calculate gravitational forces for a subset of bodies using a fully
vectorised approach. This is a top-level function as required by `multiprocessing`.
The function calculates the gravitational acceleration exerted by all bodies in the
simulation on a specific subset (chunk) of bodies.
:param slice indices: A slice object defining the subset of bodies to calculate forces for.
:param float G: The universal gravitational constant.
:param float epsilon_sq: The squared softening factor to prevent singularities.
:returns: The calculated accelerations for the given indices.
:rtype: np.ndarray
"""
# The goal of this function is to calculate the gravitational acceleration on a subset
# of bodies (defined by `indices`) due to all other bodies in the simulation.
# It uses a fully vectorised approach for maximum performance.
#
# The core formula being implemented is Newton's Law of Universal Gravitation:
# F = G * m1 * m2 / r^2
# a = F / m1 = G * m2 / r^2 (acceleration on body 1 due to body 2)
#
# The function calculates `a` for each body in the `indices` range due to all other bodies.
if _worker_state.bodies is None or _worker_state.config is None:
raise RuntimeError(
"Worker process has not been initialised with simulation data.")
# This version avoids creating the large intermediate `diffs` array of shape
# (chunk_size, n_bodies, 3), which significantly reduces memory usage.
# 1. Extract the positions for the current chunk.
chunk_positions = _worker_state.bodies.x[indices]
# 2. Calculate squared distances between each body in the chunk and all other bodies.
# This is done efficiently using NumPy broadcasting and vectorisation.
#
# The goal is to compute a matrix `dists_sq` where `dists_sq[i, j]` is the squared
# distance between the i-th body in the chunk and the j-th body in the entire system.
#
# `dists_sq` shape: (num_bodies_in_chunk, total_num_bodies)
dists_sq = np.sum(chunk_positions**2, axis=1)[:, np.newaxis] + \
np.sum(_worker_state.bodies.x**2, axis=1) - \
2 * np.dot(chunk_positions, _worker_state.bodies.x.T)
# Apply a softening factor to avoid division by zero for bodies at the same position.
# The `epsilon_sq` value is used to prevent the force from
# becoming infinite when two bodies are very close to each other. This is a common
# technique in N-body simulations to improve stability.
dists_sq[dists_sq < epsilon_sq] = epsilon_sq
# 3. Calculate `G * m_j / d_ij^3` for each pair.
# This is the core of the gravitational force calculation.
#
# We want to calculate `force_magnitudes[i, j] = G × m_j / d_ij³`, where:
# - `G` is the gravitational constant.
# - `m_j` is the mass of the j-th body.
# - `d_ij` is the distance between the i-th body in the chunk and the j-th body.
#
# We perform the calculation in the following steps:
# a. Calculate `1 / d_ij³` in-place by raising `dists_sq` to the power of -1.5.
# b. Multiply by `G` and the mass of each body (`m_j`) to get the force magnitudes.
#
dists_sq **= -1.5
force_magnitudes = G * _worker_state.bodies.m[np.newaxis, :] * dists_sq
# A body does not exert a gravitational force on itself. We must explicitly zero-out
# Zero out the diagonal of the force_magnitudes matrix.
# This removes the force contribution for each body in the chunk acting on itself.
# The mathematical formula used for acceleration correctly cancels this term out,
# but being explicit makes the code more robust and easier to understand.
np.fill_diagonal(force_magnitudes, 0)
# 4. Calculate the final acceleration vectors.
# We compute the sum of `force_magnitudes * (all_pos - chunk_pos)`
# without storing the full `diffs` array.
# `force_magnitudes @ _worker_state.positions` computes the sum of `F_ij × pos_j`.
# `np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions` computes `sum(F_ij) × pos_i`.
# The difference gives the total acceleration vector.
#
# The formula for acceleration on body `i` is `a_i = Σ G × m_j × (x_j - x_i) / |r_ij|³`.
# This can be expanded to `a_i = (Σ G×m_j/|r_ij|³ × x_j) - (Σ G×m_j/|r_ij|³) × x_i`.
#
# Let `F_ij = G*m_j/|r_ij|^3`. The formula is `a_i = (Σ F_ij * x_j) - (Σ F_ij) * x_i`.
#
# The code implements this as:
# `accel = (force_magnitudes @ _worker_state.bodies.x) - np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions`
#
accel = (force_magnitudes @ _worker_state.bodies.x) - \
np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions
return accel
def _init_worker(bodies: Bodies, config: SimulationConfig) -> None:
"""
Initialiser for each worker process in the multiprocessing pool.
This function sets the large NumPy arrays as global variables within each
worker's memory space. This avoids the overhead of pickling and sending
these large arrays for every task.
:param Bodies bodies: The main SoA container with all body data.
:param SimulationConfig config: The simulation configuration object.
"""
_worker_state.bodies = bodies
_worker_state.config = config
# pylint: disable=too-many-instance-attributes
[docs]
class Space:
"""
Class describing a space in which move several bodies.
:ivar float _dt: Model time step (in s).
:ivar float _previous_dt: Previous model time step (in s).
:ivar SimulationConfig _config: The simulation configuration object.
:ivar Bodies _bodies: The main SoA container.
:ivar _ProfilingData _profiling: A container for performance profiling timers.
:ivar list[list[int]] _collision_graph: Graph of collisions.
:ivar list[int] _colours: Colours used for spatialised collision handling.
:ivar Pool _pool: A pool of worker processes for parallel computation.
:ivar int _num_processes: The number of worker processes in the pool.
:ivar float _last_total_energy: The total energy from the previous logged step.
:ivar int _current_iteration: Helper for detailed logging.
:ivar bool _logged_collision_details: Helper for detailed logging.
"""
_dt: float
_previous_dt: float
_config: SimulationConfig
_bodies: Bodies
_profiling: _ProfilingData
# Adjacency list for the collision graph
_collision_graph: list[list[int]]
_colours: list[int] # Node colours for graph colouring
_pool: Pool
_num_processes: int
_last_total_energy: float | None
_current_iteration: int
_logged_collision_details: bool
def _generate_non_overlapping_positions(self, n: int, radii: np.ndarray,
dens: float,
rng: np.random.Generator) -> np.ndarray:
"""
Generates non-overlapping positions using Poisson Disk Sampling.
This method uses a fast, NumPy-based implementation of Bridson's algorithm to
guarantee that no two body centres are closer than the sum of their radii,
producing a more uniform and stable initial distribution than random placement.
The placement volume is scaled with the number of bodies to maintain consistent
density, matching the C++ implementation.
:param int n: The number of positions to generate.
:param np.ndarray radii: An array of radii for each of the bodies.
:param float dens: Body density.
:param np.random.Generator rng: The random number generator to use.
:returns: An array of `n_bodies` non-overlapping positions.
:rtype: np.ndarray
"""
# Calculate placement scale based on number of bodies and average radius,
# matching the C++ implementation's approach.
# Average radius is approximately (MIN_MASS + MAX_MASS) / 2 * dens
avg_mass = (constants.MIN_MASS + constants.MAX_MASS) / 2.0
avg_radius = avg_mass * dens
placement_scale = np.cbrt(float(n)) * avg_radius * constants.PLACEMENT_SCALE_FACTOR
# Domain is a cube centered at origin with side length 2 * placement_scale
domain_size = 2.0 * placement_scale
dims = np.array([domain_size] * 3)
# The minimum distance between points is based on the largest radius.
# This ensures even the largest bodies will not overlap in the initial placement.
min_dist = 2.0 * np.max(radii)
# Cell size for the acceleration grid
cell_size = min_dist / np.sqrt(3)
grid_shape = np.ceil(dims / cell_size).astype(int)
grid = -np.ones(grid_shape, dtype=int)
# --- Algorithm Setup ---
# This list will store the final, valid positions.
points = np.zeros((n, 3))
# This list holds the indices of "active" points to generate new candidates from.
active_list = []
points_count = 0
# Number of attempts to find a valid point around an active point.
k_rejection_samples = 30
# --- 1. Start with a single random point ---
initial_point = rng.uniform(0, dims) - dims / 2
initial_idx = 0
grid_idx = (initial_point + dims / 2) / cell_size
grid[tuple(grid_idx.astype(int))] = initial_idx
points[initial_idx] = initial_point
active_list.append(initial_idx)
points_count += 1
# --- 2. Generate subsequent points ---
while active_list and points_count < n:
active_idx = rng.choice(active_list)
active_point = points[active_idx]
found_candidate = False
for _ in range(k_rejection_samples):
# Generate a random candidate point in the annulus around the active point.
angle = rng.uniform(0, 2 * np.pi, 2)
radius = rng.uniform(min_dist, 2.0 * min_dist)
offset = np.array([radius * np.sin(angle[0]) * np.cos(angle[1]),
radius *
np.sin(angle[0]) * np.sin(angle[1]),
radius * np.cos(angle[0])])
candidate = active_point + offset
# Check if the candidate is within the domain and non-overlapping.
if np.all(np.abs(candidate) < dims / 2):
grid_pos = ((candidate + dims / 2) / cell_size).astype(int)
if grid[tuple(grid_pos)] == -1:
points[points_count] = candidate
grid[tuple(grid_pos)] = points_count
active_list.append(points_count)
points_count += 1
found_candidate = True
# Found a valid point, move to the next active point.
break
if not found_candidate:
active_list.remove(active_idx)
return points[:points_count]
def _resize_structures(self, new_size: int) -> None:
"""
Resizes internal data structures to a new size.
:param int new_size: The new number of bodies.
"""
self._bodies = Bodies(new_size)
self._collision_graph = [[] for _ in range(new_size)]
self._colours = [-1] * new_size
def _initialize_bodies(self, n: int, dens: float, seed: int) -> None:
"""
Initialises the physical properties and positions of all bodies.
:param int n: Number of bodies.
:param float dens: Body density.
:param int seed: Seed for random number generation.
"""
rng = np.random.default_rng(seed if seed != 0 else None)
# --- 1. Generate Ideal Physical Properties ---
# Generate properties for the requested number of bodies `n`.
masses = rng.uniform(constants.MIN_MASS, constants.MAX_MASS, n)
radii = masses * dens
velocities = rng.uniform(
constants.MIN_VELOCITY, constants.MAX_VELOCITY, (n, 3))
angular_velocities = rng.uniform(
-constants.MAX_INITIAL_ANGULAR_VELOCITY, constants.MAX_INITIAL_ANGULAR_VELOCITY, (n, 3))
# --- 2. Generate Valid Non-Overlapping Positions ---
positions = self._generate_non_overlapping_positions(n, radii, dens, rng)
n_placed = len(positions)
# --- 3. Handle Cases Where Not All Bodies Could Be Placed ---
# If Poisson Disk Sampling couldn't find space for all bodies (e.g., due to
# high density), we adjust the simulation to use only the bodies that were placed.
if n_placed < n:
qWarning(
f"Failed to generate {n} non-overlapping positions. "
f"The density may be too high. Using {n_placed} bodies instead."
)
# Slice all property arrays to match the number of successfully placed bodies.
masses = masses[:n_placed]
radii = radii[:n_placed]
velocities = velocities[:n_placed]
angular_velocities = angular_velocities[:n_placed]
# --- 4. Assign Properties to Bodies Container (Fully Vectorised) ---
self._bodies = Bodies(n_placed)
self._bodies.m[:n_placed] = masses
self._bodies.r[:n_placed] = radii
self._bodies.x[:n_placed] = positions
self._bodies.v[:n_placed] = velocities
self._bodies.omega[:n_placed] = angular_velocities
# Add epsilon to prevent division by zero for bodies with zero mass.
self._bodies.inv_m[:n_placed] = 1.0 / (masses + constants.EPSILON)
inertia = 0.4 * masses * radii**2
self._bodies.i_inv[:n_placed] = 1.0 / (inertia + constants.EPSILON)
self._bodies.set_size(n_placed)
def _warm_up_simulation(self) -> None:
"""
Runs a few "warm-up" iterations to ensure a stable start.
**Problem:** The simulation starts with zero acceleration. The adaptive time-stepper
calculates the `dt` for the current frame based on accelerations from the *previous*
frame. On the first real frame, it would see zero acceleration and incorrectly choose
a large `dt`, potentially causing an unstable "explosion" if bodies start close together.
**Solution:** This method runs the core dynamics calculation a few times. This populates
the acceleration arrays and allows the adaptive `dt` to converge to a safe, stable
value *before* the main simulation loop begins.
"""
# Store the real dt and use a tiny one to kickstart the process safely.
original_dt = self._dt
self._dt = self._previous_dt = self._config.epsilon * 0.1
for _ in range(5):
# This computes forces and updates `self._dt` via the adaptive algorithm.
self.compute_dynamics(0)
# Restore the last calculated stable dt, but cap it at the original initial dt
# to prevent the first visible step from being unexpectedly large.
self._dt = self._previous_dt = min(self._dt, original_dt)
def __init__(self, config: SimulationConfig, initial_dt: float):
"""
Initialises the simulation space.
:param SimulationConfig config: The configuration object for the simulation.
:param float initial_dt: The initial time step for the simulation.
"""
self._config = config
self._dt = initial_dt
self._previous_dt = initial_dt
self._bodies = Bodies(self._config.n_bodies)
self._current_iteration = 0
self._logged_collision_details = False
self._collision_graph = [[] for _ in range(self._config.n_bodies)]
self._colours = [-1] * self._config.n_bodies
self._last_total_energy = None
self._num_processes = cpu_count()
self._initialize_bodies(
self._config.n_bodies, self._config.dens, self._config.seed)
self._profiling = _ProfilingData()
# Initialize with a very high value to avoid false positive on first check
self._last_total_energy = None
# --- Multiprocessing Pool Initialisation ---
# The pool is created *after* initialisation so that child processes can inherit
# the large data arrays via the initialiser.
# R1732 is a false positive here. The pool’s lifecycle is tied to the Space object’s
# lifetime, not a smaller ‘with’ block. Cleanup is handled in the `cleanup()` method.
self._pool = Pool(processes=self._num_processes, initializer=_init_worker, # pylint: disable=consider-using-with
initargs=(self._bodies, self._config))
# --- Adaptive Time-step Warm-up ---
self._warm_up_simulation()
[docs]
def cleanup(self) -> None:
"""
Clean up resources, such as closing the multiprocessing pool.
"""
# Close the pool to prevent any new tasks from being submitted.
self._pool.close()
# Wait for all worker processes to complete their tasks and exit.
self._pool.join()
[docs]
def print_profiling_report(self) -> None:
"""
Prints a formatted report of the time spent in different stages of the simulation
if diagnostics are enabled.
"""
total_time = self._profiling.total()
if total_time < self._config.epsilon:
qDebug("\n--- Profiling Report (No data) ---")
return
report = "\n--- Profiling Report ---\n"
profiling_data = self._profiling.__dict__
for name, time_spent in profiling_data.items():
percentage = time_spent / total_time * 100
report += f"{name:<22} {time_spent:9.4f}s ({percentage:5.1f}%)\n"
report += "-" * 40 + "\n"
report += f"{'Total Instrumented Time':<22} {total_time:9.4f}s"
qDebug(report)
def _calculate_energies(self) -> tuple[float, float, float]:
"""
Computes the total translational kinetic, rotational kinetic, and gravitational
potential energy of the system using vectorised calculations.
:returns: A tuple containing (translational KE, rotational KE, potential energy).
:rtype: tuple[float, float, float]
"""
# Number of bodies in the simulation space.
num_bodies = len(self._bodies)
if num_bodies == 0:
return 0.0, 0.0, 0.0
# --- Kinetic Energies (Vectorised) ---
# Translational KE = 0.5 * m * v^2
# We use einsum for an efficient, single-pass calculation of sum(m_i * v_i^2).
ke_trans = 0.5 * np.einsum('i,ij,ij->', self._bodies.m,
self._bodies.v, self._bodies.v)
# Rotational KE = 0.5 * I * ω^2 = 0.5 * (1/i_inv) * ω^2
# This calculates KE for all bodies. For bodies with i_inv=0, this results
# in `inf * 0`, which is NaN. `nansum` correctly sums only the valid numbers.
omega_sq = np.sum(self._bodies.omega**2, axis=1)
with np.errstate(divide='ignore', invalid='ignore'):
ke_rot = 0.5 * np.nansum(omega_sq / self._bodies.i_inv)
# --- Potential Energy (Vectorised) ---
# Potential Energy U = -G * m1 * m2 / r
# We calculate this for all unique pairs of bodies (where i < j).
# Get upper triangle indices to select unique pairs (i, j) where i < j.
i_indices, j_indices = np.triu_indices(num_bodies, k=1)
# Calculate squared distances for all unique pairs.
dist_vecs = self._bodies.x[j_indices] - self._bodies.x[i_indices]
dist_sq = np.sum(dist_vecs**2, axis=1)
# Calculate the potential energy for all pairs.
# We add a small softening factor (epsilon^2) to the denominator to prevent
# division by zero and to handle close encounters more stably.
inv_dist = 1.0 / np.sqrt(dist_sq + self._config.epsilon**2)
potential = -self._config.universal_g * np.sum(
self._bodies.m[i_indices] * self._bodies.m[j_indices] * inv_dist)
return ke_trans, ke_rot, potential
def _get_effective_mass(self, ctx: CollisionContext, direction_vec: np.ndarray) -> float:
"""
Calculates the effective mass of two colliding bodies in a given direction.
The "effective mass" represents the system's total resistance to acceleration along
the specified direction, accounting for both linear and rotational inertia.
:param CollisionContext ctx: The collision context.
:param np.ndarray direction_vec: The direction vector (e.g., normal or tangential).
:returns: The scalar effective mass.
:rtype: float
"""
inv_mass_sum = ctx.b1.inv_m + ctx.b2.inv_m
# Rotational inertia contribution for body 1: (r1 x n)^2 / I1
term1 = np.dot(np.cross(ctx.r1_vec, direction_vec),
np.cross(ctx.r1_vec, direction_vec)) * ctx.b1.i_inv
# Rotational inertia contribution for body 2: (r2 x n)^2 / I2
term2 = np.dot(np.cross(ctx.r2_vec, direction_vec),
np.cross(ctx.r2_vec, direction_vec)) * ctx.b2.i_inv
return 1.0 / (inv_mass_sum + term1 + term2 + self._config.epsilon)
def _apply_restitution_impulse(self, ctx: CollisionContext) -> tuple[np.ndarray, float]:
"""
Calculates and applies the normal impulse (restitution) for a collision.
:param CollisionContext ctx: The collision context.
:returns: A tuple containing the impulse vector and its magnitude.
:rtype: tuple[np.ndarray, float]
"""
v_rel_normal = np.dot(ctx.v_rel, ctx.n_vec)
effective_mass_n = self._get_effective_mass(ctx, ctx.n_vec)
# --- Baumgarte Stabilisation ---
# Add a "bias" term to the impulse to correct for positional error (penetration).
# This term creates a small impulse that aims to resolve the overlap over the
# next time step, preventing the bodies from sinking into each other.
bias = (self._config.positional_correction_factor /
self._previous_dt) * max(0, ctx.overlap - 0.01)
# Impulse for restitution (velocity change)
jn_vel = -(1.0 + self._config.coeff_restitution) * v_rel_normal
# Impulse for positional correction (bias)
jn_bias = bias
jn = (jn_vel + jn_bias) * effective_mass_n
# The impulse must be repulsive, so it cannot be negative.
jn = max(0.0, jn)
return jn * ctx.n_vec, jn
def _apply_friction_impulse(self, ctx: CollisionContext, jn: float,
impulse_n: np.ndarray) -> np.ndarray:
"""
Calculates and applies the friction impulse for a collision.
:param CollisionContext ctx: The collision context.
:param float jn: The magnitude of the normal impulse, used for the friction limit.
:param np.ndarray impulse_n: The normal impulse vector.
:returns: The friction impulse vector.
:rtype: np.ndarray
"""
# To correctly calculate friction, we must use the relative velocity *after* the
# restitution impulse has been applied.
v_rel_after_restitution = ctx.v_rel + \
impulse_n * (ctx.b1.inv_m + ctx.b2.inv_m)
# Calculate the tangential component of the new relative velocity.
v_t = v_rel_after_restitution - \
np.dot(v_rel_after_restitution, ctx.n_vec) * ctx.n_vec
v_t_norm = np.linalg.norm(v_t)
# If there's no tangential velocity, there's no friction.
if v_t_norm < self._config.epsilon:
return np.zeros(3)
t_vec = v_t / v_t_norm
effective_mass_t = self._get_effective_mass(ctx, t_vec)
# Calculate the magnitude of the impulse required to stop the tangential motion.
# The impulse vector required to stop tangential motion is -v_t × effective_mass_t.
# The magnitude of this impulse is v_t_norm * effective_mass_t.
jt_required_magnitude = v_t_norm * effective_mass_t
# Clamp the friction impulse to the friction cone (Coulomb's law).
# If the required impulse is within the static friction limit, apply it.
# Otherwise, apply the kinetic friction impulse.
static_friction_limit = self._config.coeff_static_friction * jn
if jt_required_magnitude < static_friction_limit:
jt = jt_required_magnitude
else:
kinetic_friction_limit = self._config.coeff_friction * jn
jt = kinetic_friction_limit
# The friction impulse vector opposes the direction of tangential motion.
return jt * -t_vec
def _colour_graph(self) -> int:
"""
Implementation of a greedy graph colouring algorithm to partition the collision graph.
This is a crucial step for safe collision resolution. By assigning a "color" to each
body such that no two adjacent bodies (i.e., two bodies that are colliding) share the
same colour, we can process collisions in stages.
#
**Why this is useful for parallelism (or safe sequential processing):**
All bodies of the same colour form an "independent set", meaning none of them are
colliding with each other. Therefore, all collisions involving bodies of a specific
colour can be processed simultaneously (or in a single sequential pass) without causing
race conditions or conflicting updates. For example, when processing "colour 0" bodies,
# we can handle collisions (0, 5) and (1, 8) at the same time because we know that
body 0 and body 1 are not colliding with each other.
The algorithm iterates through each body, checks the colours of its neighbours in the
collision graph, and assigns the smallest available colour to the current body.
:returns: The total number of colours used (k), which corresponds to the number of
sequential stages needed for collision resolution.
:rtype: int
"""
num_bodies: Final[int] = len(self._bodies)
self._colours = [-1] * num_bodies
if num_bodies == 0:
return 0
# Process nodes in descending order of their degree for a potentially better colouring.
# This is a common heuristic for greedy colouring algorithms.
node_order = np.argsort(
[-len(self._collision_graph[i]) for i in range(num_bodies)])
for i in node_order:
# Find the set of colours used by neighbours of the current node.
neighbour_colours = {self._colours[j] for j in self._collision_graph[i]
if self._colours[j] != -1}
# Find the smallest non-negative integer colour not in use by neighbours.
# This is a fast, set-based way to find the "first available" colour.
c = 0
while c in neighbour_colours:
c += 1
self._colours[i] = c
# The number of stages required is one more than the highest colour index used.
return max(self._colours) + 1 if self._colours else 0
def _log_system_energy(self, iteration: int) -> None:
"""
Logs the system's energy at specified intervals if diagnostics are enabled.
:param int iteration: Current iteration number.
"""
# This diagnostic runs periodically to check for energy conservation, a key indicator of
# simulation stability. It should only read data, not modify the simulation state.
if not (iteration > 0 and iteration % self._config.log_freq == 0):
return
ke_trans, ke_rot, potential = self._calculate_energies()
total_ke = ke_trans + ke_rot
current_total_energy = total_ke + potential
qDebug(f"\n--- Iteration {iteration}: System Energy ---")
qDebug(f" Translational KE: {ke_trans:12.4e}")
qDebug(f" Rotational KE: {ke_rot:12.4e}")
qDebug(f" Potential Energy: {potential:12.4e}")
qDebug(f" Total Energy: {current_total_energy:12.4e}")
qDebug(f" Time Step (dt): {self._previous_dt:12.4e}")
# On the first run, just store the energy and return.
if self._last_total_energy is None:
self._last_total_energy = current_total_energy
return
# A significant increase in total energy indicates a numerical instability, often caused
# by the collision response. We log details to help diagnose the issue.
# We allow for a small tolerance to account for floating-point inaccuracies.
if self._last_total_energy is not None and current_total_energy > self._last_total_energy + self._config.epsilon:
warning_msg = f"\n{'=' * 40}\n"
warning_msg += (f" Total energy increased by "
f"{current_total_energy - self._last_total_energy:.4e}\n")
warning_msg += "="*40
qWarning(warning_msg)
# The collision graph holds pairs from the *previous* step, which is what we want
# to inspect as the likely cause of the energy increase.
colliding_pairs = []
for i, neighbours in enumerate(self._collision_graph):
for j in neighbours:
if i < j:
colliding_pairs.append((i, j))
if colliding_pairs:
qWarning(" State of bodies from previous step's collision graph:")
for i, j in colliding_pairs:
b1, b2 = self._bodies[i], self._bodies[j]
qWarning(f" Pair ({i}, {j}):")
qWarning(f" Body {i}: v={b1.v}, omega={b1.omega}")
qWarning(f" Body {j}: v={b2.v}, omega={b2.omega}")
self._last_total_energy = current_total_energy
def _compute_gravity(self, num_bodies: int) -> np.ndarray:
"""
Computes gravitational forces for all bodies.
:param int num_bodies: Number of bodies to consider.
:returns: An array of acceleration vectors for each body.
:rtype: np.ndarray
"""
if num_bodies == 0:
return np.zeros((0, 3))
epsilon_sq = self._config.epsilon**2
# For small to medium numbers of bodies, the overhead of multiprocessing is not
# worth it. We use a single, fully vectorised calculation, which is significantly
# faster than a Python loop.
if num_bodies >= 250:
# For a large number of bodies, we parallelise the calculation across
# multiple processes to leverage all available CPU cores.
# 1. Create chunks of work for each process.
chunk_size = (num_bodies + self._num_processes -
1) // self._num_processes
chunks = [slice(i, i + chunk_size)
for i in range(0, num_bodies, chunk_size)]
# 2. Use starmap to pass multiple arguments to the worker function.
results = self._pool.starmap(_calculate_force_chunk, [ # type: ignore
(chunk, self._config.universal_g, epsilon_sq) for chunk in chunks])
return np.vstack(results)
# We must initialize the worker state because the worker function relies on it.
_init_worker(self._bodies, self._config)
return _calculate_force_chunk(
slice(0, num_bodies), self._config.universal_g, epsilon_sq)
def _build_collision_graph(self, num_bodies: int) -> None: # pylint: disable=too-many-locals
"""
Builds the collision graph using a two-stage broad-phase/narrow-phase strategy
to efficiently detect all colliding pairs.
:param int num_bodies: The number of bodies in the simulation.
"""
if num_bodies < 2:
return
# 1. Clear previous collision graph data.
for neighbors in self._collision_graph:
neighbors.clear()
positions = self._bodies.x[:num_bodies]
radii = self._bodies.r[:num_bodies]
# --- Broad-Phase Collision Detection ---
# The goal of the broad phase is to quickly eliminate pairs of bodies that are
# definitely not colliding, creating a smaller list of "candidate" pairs for the
# more expensive narrow-phase check. We use a k-d tree.
# 1. Build two k-d trees on the body positions for efficient spatial queries.
# Using two trees is a common pattern for this type of query.
tree = cKDTree(positions)
# 2. Query the k-d tree to find all potentially colliding pairs.
# `query_pairs` finds all pairs of points whose distance is at most `r`.
# We set `r` to the diameter of the largest possible body to ensure we don't
# miss any potential collisions. `p=2.0` specifies the Euclidean distance.
max_radius = np.max(radii) if num_bodies > 0 else 0
# This returns a set of tuples, which we convert to a list of lists for processing.
# The old method used a single large radius, which is inefficient for varied body sizes.
# candidate_pairs = list(tree.query_pairs(r=max_radius * 2, p=2.0))
# --- New Broad-Phase: Per-Body Radius Query ---
# This is a more sophisticated approach. For each body `i`, we query for neighbours
# within a radius of `radii[i] + max_radius`. This is the largest possible distance
# at which body `i` could collide with any other body. This generates a much more
# precise set of candidate pairs than the old method.
all_neighbors = tree.query_ball_point(
positions, r=radii + max_radius, p=2.0)
# The query results include self-pairs (i, i) and duplicate pairs (i, j) and (j, i).
# We use a set to efficiently store unique pairs where i < j.
candidate_pairs_set = set()
for i, neighbors in enumerate(all_neighbors):
for j in neighbors:
if i < j:
candidate_pairs_set.add((i, j))
# --- Narrow-Phase Collision Detection ---
# For the candidate pairs from the broad phase, perform a precise intersection test.
# This is done in a vectorised manner for performance.
if candidate_pairs_set:
indices_i, indices_j = np.array(list(candidate_pairs_set)).T
diffs = positions[indices_j] - positions[indices_i]
dists_sq = np.sum(diffs**2, axis=1)
r_sums_sq = (radii[indices_i] + radii[indices_j])**2
colliding_mask = dists_sq < r_sums_sq
# Directly access the colliding pairs using the boolean mask.
actual_colliding_pairs = np.array(
list(candidate_pairs_set))[colliding_mask]
for i, j in actual_colliding_pairs:
self._collision_graph[i].append(j)
self._collision_graph[j].append(i)
def _resolve_all_collisions(self, num_bodies: int) -> None:
"""
Resolves all collisions for the current step by colouring the collision
graph and handling collisions in parallelizable stages.
:param int num_bodies: The number of bodies in the simulation.
"""
# 1. Colour the collision graph to partition colliding bodies into independent sets.
# This returns the total number of colours (k) needed.
num_colours = self._colour_graph()
if num_colours == 0:
return
# 2. Group bodies by their assigned colour for efficient processing.
# This avoids iterating through all bodies for each colour.
bodies_by_colour = [[] for _ in range(num_colours)]
for i in range(num_bodies):
if self._colours[i] != -1:
bodies_by_colour[self._colours[i]].append(i)
# 3. Process collisions in `k` stages, one for each colour.
# This loop is the key to safe collision resolution. All bodies of the same
# colour are non-adjacent, so their collisions can be resolved in parallel.
for k in range(num_colours):
for i in bodies_by_colour[k]:
for j in self._collision_graph[i]:
if i < j:
ctx = CollisionContext.from_bodies(
self._bodies[i], self._bodies[j])
if not ctx:
continue
impulse_n, jn = self._apply_restitution_impulse(ctx)
if jn > self._config.epsilon:
impulse_t = self._apply_friction_impulse(
ctx, jn, impulse_n)
total_impulse = impulse_n + impulse_t
# Apply impulses to both bodies in a single vectorised call.
self._bodies.apply_impulses(
indices=np.array([i, j]),
impulses=np.array(
[-total_impulse, total_impulse]),
contact_vectors=np.array([ctx.r1_vec, ctx.r2_vec]))
def _update_adaptive_dt(self) -> None:
"""
Calculates a stable time step `dt` for the next frame based on the maximum
acceleration from the current frame.
This makes the time step responsive to the simulation's dynamics, ensuring stability
during high-acceleration events (like close encounters) while allowing for efficiency
when bodies are far apart.
Note: This method follows the same logic as the C++ implementation. It stores
the current dt as previous_dt before calculating the new dt.
"""
# Store the dt we just used for this frame's integration.
# This matches the C++ implementation's behavior.
self._previous_dt = self._dt
num_bodies = len(self._bodies)
if num_bodies == 0:
self._dt = self._config.max_dt
return
# We work with squared values as long as possible to improve numerical stability
# and avoid unnecessary square root operations.
max_dt_sq = self._config.max_dt**2
# --- 1. Linear Acceleration Constraint ---
max_accel_sq = np.max(np.sum(self._bodies.a**2, axis=1))
dt_a_sq: float
if max_accel_sq > self._config.epsilon**2:
# Constraint derived from d = 0.5*a*t^2. We solve for t^2: t^2 = 2*d/a.
# To work with squared accelerations, we square the equation: t^4 = (2d)^2 / a^2.
# Then, dt_a_sq = t² = sqrt((2d)² / a²) = 2d / sqrt(a²).
# This is numerically more consistent than mixing squared and non-squared values.
dt_a_sq = (2.0 * self._config.target_dx) / math.sqrt(max_accel_sq)
else:
dt_a_sq = max_dt_sq
# --- 2. Linear Velocity Constraint (CFL Condition) ---
# This is a Courant-Friedrichs-Lewy (CFL) condition, ensuring a body does not
# travel more than a fraction of its own radius in a single step.
vel_sq = np.sum(self._bodies.v**2, axis=1)
dt_v_sq: float
# Vectorize the calculation: dt_v_sq_i = (C*r_i)^2 / v_i^2 for each body.
# Then find the minimum of these values. This avoids the `argmax` lookup.
with np.errstate(divide='ignore', invalid='ignore'):
# The condition is: v_i × dt < C × r_i => dt² < (C×r_i)² / v_i²
# We use a safety factor C=0.5.
dt_v_sq_all = (0.5 * self._bodies.r)**2 / \
(vel_sq + self._config.epsilon**2)
dt_v_sq = np.min(dt_v_sq_all)
# --- 3. Rotational Constraints ---
# Prevents a body from rotating too far in a single step.
max_omega_sq = np.max(np.sum(self._bodies.omega**2, axis=1))
max_alpha_sq = np.max(np.sum(self._bodies.alpha**2, axis=1))
# Angular velocity constraint: omega * dt < max_angle
dt_omega_sq: float
if max_omega_sq > self._config.epsilon**2:
# Max allowed rotation in radians (approx. 5.7 degrees)
max_angle = 0.1
dt_omega_sq = max_angle**2 / max_omega_sq
else:
dt_omega_sq = max_dt_sq
# Angular acceleration constraint: 0.5 × alpha × dt² < max_angle
dt_alpha_sq: float
if max_alpha_sq > self._config.epsilon**2:
max_angle = 0.1
dt_alpha_sq = (2.0 * max_angle) / math.sqrt(max_alpha_sq)
else:
dt_alpha_sq = max_dt_sq
# --- 4. Combine Constraints and Apply Damping ---
# The final target dt is the square root of the minimum of the squared constraints.
target_dt = math.sqrt(min(
max_dt_sq,
dt_a_sq, dt_v_sq,
dt_omega_sq, dt_alpha_sq
) + self._config.epsilon)
# Smooth the change in dt using an exponential moving average (a low-pass filter)
# to prevent rapid oscillations in the time step.
self._dt = self._dt * (1 - self._config.dt_damping_factor) + \
target_dt * self._config.dt_damping_factor
[docs]
def compute_dynamics(self, iteration: int) -> None:
"""
Computes one step of the simulation, including dynamics and collisions.
This method follows the same order of operations as the C++ implementation
to ensure consistent behavior and valid benchmark comparisons.
:param int iteration: Current iteration.
"""
# ======================================================================================
# --- 1. SETUP & LOGGING ---
# ======================================================================================
# Store iteration for detailed logging.
self._current_iteration = iteration
self._log_system_energy(iteration)
# Number of bodies in the space.
num_bodies: Final[int] = len(self._bodies)
# ======================================================================================
# --- 2. VELOCITY VERLET INTEGRATION (Part 1) ---
# ======================================================================================
# Update positions based on the previous frame's state.
# Capture dt before it may be modified by _update_adaptive_dt().
# Both integrate_part1 and integrate_part2 must use the same dt for the
# Velocity Verlet scheme to be correct.
current_dt = self._previous_dt
start_time = time.perf_counter()
# --- Part 1: Update positions and half-step velocities ---
# The entire integration for this frame must use a single, consistent time step.
# We use `_previous_dt` (the dt calculated at the end of the last frame) for both
# parts of the Verlet integration to ensure energy conservation.
self._bodies.integrate_part1(self._previous_dt)
self._profiling.integration += time.perf_counter() - start_time
# ======================================================================================
# --- 3. COLLISION DETECTION ---
# ======================================================================================
start_time = time.perf_counter()
# Find all pairs of colliding bodies.
self._build_collision_graph(num_bodies)
self._profiling.collision_detection += time.perf_counter() - start_time
# ======================================================================================
# --- 4. COLLISION RESPONSE ---
# ======================================================================================
start_time = time.perf_counter()
# Resolve the detected collisions by applying impulses and positional corrections.
self._resolve_all_collisions(num_bodies)
self._profiling.collision_response += time.perf_counter() - start_time
# ======================================================================================
# --- 5. ADAPTIVE TIME-STEPPING ---
# ======================================================================================
# Calculate dt for the next frame based on the current maximum acceleration.
# This is done after collisions so that the adaptive time step considers
# the full state of the system from the previous frame (including collision effects).
self._update_adaptive_dt()
# ======================================================================================
# --- 6. FORCE CALCULATION ---
# ======================================================================================
start_time = time.perf_counter()
# Reset accelerations
self._bodies.a.fill(0)
self._bodies.torque.fill(0)
self._bodies.alpha.fill(0)
# --- Gravitational Forces ---
# Calculate and apply the gravitational acceleration on each body.
gravity_accels = self._compute_gravity(num_bodies)
self._bodies.a[:num_bodies] += gravity_accels
self._profiling.force_calculation += time.perf_counter() - start_time
# --- Convert Torque to Angular Acceleration ---
# alpha = torque * I_inv. We do this after all torques for the frame are accumulated.
self._bodies.alpha[:] = self._bodies.torque * \
self._bodies.i_inv[:, np.newaxis]
# ======================================================================================
# --- 7. VELOCITY VERLET INTEGRATION (Part 2) ---
# ======================================================================================
# Complete the velocity update using the new accelerations.
# Use the same dt that was used in integrate_part1 to maintain Velocity
# Verlet correctness.
self._bodies.integrate_part2(current_dt)
# ======================================================================================
# --- 8. DAMPING ---
# ======================================================================================
# Apply damping directly to the entire velocity arrays. This is significantly
# faster than iterating through each body.
self._bodies.v *= (1.0 - self._config.linear_damping)
self._bodies.omega *= (1.0 - self._config.angular_damping)
# Note: _previous_dt is already updated in _update_adaptive_dt() to match
# the C++ implementation's behavior. No need to update it here.
@property
def bodies(self) -> list[BodyProxy]:
"""
Provides an AoS-like list of body proxies for the display controller.
:returns: List of body proxies.
:rtype: list[BodyProxy]
"""
return [self._bodies[i] for i in range(len(self._bodies))]
[docs]
def get_body_data_for_display(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Provides current simulation data required for display as NumPy arrays.
This is optimised for efficient data transfer to the UI thread.
:returns: A tuple containing (positions, quaternions, torques, angular_accelerations).
:rtype: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
"""
n = len(self._bodies) # Use len(self._bodies) directly
return (self._bodies.x[:n], self._bodies.q[:n],
self._bodies.torque[:n], self._bodies.alpha[:n])