#!/usr/bin/env python3
# pylint: disable=too-many-lines
# -*- coding: utf-8 -*-
"""
The core physics engine of the simulation.
This module provides the `Space` class, which manages the simulation loop,
including gravity calculations, collision detection, and collision response for
all bodies.
:module: space
:author: Le Bars, Yoann
"""
# Defines the public names of this module.
__all__ = ['Space', 'CollisionContext']
from typing import Final
import time
import math
from dataclasses import dataclass
from itertools import compress
from multiprocessing import Pool, cpu_count
from scipy.spatial import cKDTree
import numpy as np
from PySide6.QtCore import qWarning, qDebug
from src.body import Bodies, BodyProxy
from src.config import SimulationConfig
@dataclass
class _ProfilingData:
"""A container for performance profiling timers."""
integration: float = 0.0
collision_detection: float = 0.0
collision_response: float = 0.0
force_calculation: float = 0.0
def total(self) -> float:
"""Returns the total instrumented time."""
return self.integration + self.collision_detection + self.collision_response \
+ self.force_calculation
[docs]
@dataclass
class CollisionContext:
"""
A data container to hold all relevant information for a single collision event.
This simplifies passing collision data between functions.
:ivar BodyProxy b1: First body.
:ivar BodyProxy b2: Second body.
:ivar np.ndarray n_vec: The collision normal vector.
:ivar np.ndarray r1_vec: The relative position vector from body 1.
:ivar np.ndarray r2_vec: The relative position vector from body 2.
:ivar np.ndarray v_rel: The relative velocity vector.
:ivar float overlap: The amount of overlap.
"""
b1: BodyProxy
b2: BodyProxy
n_vec: np.ndarray
r1_vec: np.ndarray
r2_vec: np.ndarray
v_rel: np.ndarray
overlap: float
[docs]
@classmethod
def from_bodies(cls, b1: BodyProxy, b2: BodyProxy) -> 'CollisionContext | None':
"""
Factory method to create a CollisionContext if two bodies are colliding.
:param BodyProxy b1: First body.
:param BodyProxy b2: Second body.
:returns: A CollisionContext object if a collision occurs, otherwise None.
:rtype: CollisionContext | None
"""
d_vec = b2.x - b1.x
dist_sq = np.dot(d_vec, d_vec)
r_sum = b1.r + b2.r
if dist_sq >= r_sum**2 or dist_sq == 0:
return None
dist = math.sqrt(dist_sq)
n_vec = d_vec / dist
# r1_vec points from the centre of body 1 to the contact point.
r1_vec = b1.r * n_vec
# r2_vec points from the centre of body 2 to the contact point.
r2_vec = -b2.r * n_vec
v_rel = (b2.v + np.cross(b2.omega, r2_vec)) - \
(b1.v + np.cross(b1.omega, r1_vec))
overlap = (b1.r + b2.r) - dist
return cls(b1, b2, n_vec, r1_vec, r2_vec, v_rel, overlap)
class WorkerState:
"""
A simple class-based namespace to hold data for worker processes.
Using a class avoids the need for `global` statements, which Pylint discourages. This state is
initialised once per worker.
"""
bodies: Bodies | None = None
config: SimulationConfig | None = None
# A single instance of the state for each worker process.
_worker_state = WorkerState()
def _calculate_force_chunk(indices: slice, G: float, epsilon_sq: float) -> np.ndarray:
"""
Worker function to calculate gravitational forces for a subset of bodies using a fully
vectorised approach. This is a top-level function as required by `multiprocessing`.
The function calculates the gravitational acceleration exerted by all bodies in the
simulation on a specific subset (chunk) of bodies.
:param slice indices: A slice object defining the subset of bodies to calculate forces for.
:param float G: The universal gravitational constant.
:param float epsilon_sq: The squared softening factor to prevent singularities.
:returns: The calculated accelerations for the given indices.
:rtype: np.ndarray
"""
# The goal of this function is to calculate the gravitational acceleration on a subset
# of bodies (defined by `indices`) due to all other bodies in the simulation.
# It uses a fully vectorised approach for maximum performance.
#
# The core formula being implemented is Newton's Law of Universal Gravitation:
# F = G * m1 * m2 / r^2
# a = F / m1 = G * m2 / r^2 (acceleration on body 1 due to body 2)
#
# The function calculates `a` for each body in the `indices` range due to all other bodies.
if _worker_state.bodies is None or _worker_state.config is None:
raise RuntimeError(
"Worker process has not been initialised with simulation data.")
# This version avoids creating the large intermediate `diffs` array of shape
# (chunk_size, n_bodies, 3), significantly reducing memory usage.
# 1. Extract the positions for the current chunk.
chunk_positions = _worker_state.bodies.x[indices]
# 2. Calculate squared distances between each body in the chunk and all other bodies.
# This is done efficiently using NumPy broadcasting and vectorisation.
#
# The goal is to compute a matrix `dists_sq` where `dists_sq[i, j]` is the squared
# distance between the i-th body in the chunk and the j-th body in the entire system.
#
# `dists_sq` shape: (num_bodies_in_chunk, total_num_bodies)
dists_sq = np.sum(chunk_positions**2, axis=1)[:, np.newaxis] + \
np.sum(_worker_state.bodies.x**2, axis=1) - \
2 * np.dot(chunk_positions, _worker_state.bodies.x.T)
# Apply softening factor to avoid division by zero for bodies at the same position.
# The `epsilon_sq` value is added to the squared distances to prevent the force from
# becoming infinite when two bodies are very close to each other. This is a common
# technique in N-body simulations to improve stability.
dists_sq[dists_sq < epsilon_sq] = epsilon_sq
# 3. Calculate `G * m_j / d_ij^3` for each pair.
# This is the core of the gravitational force calculation.
#
# We want to calculate `force_magnitudes[i, j] = G * m_j / d_ij^3`, where:
# - `G` is the gravitational constant.
# - `m_j` is the mass of the j-th body.
# - `d_ij` is the distance between the i-th body in the chunk and the j-th body.
#
# We perform the calculation in the following steps:
# a. Calculate `1 / d_ij^3` in-place by raising `dists_sq` to the power of -1.5.
# b. Multiply by `G` and the mass of each body (`m_j`) to get the force magnitudes.
#
dists_sq **= -1.5
force_magnitudes = G * _worker_state.bodies.m[np.newaxis, :] * dists_sq
# A body does not exert a gravitational force on itself. We must explicitly
# zero out the diagonal of the force matrix.
if indices.start == 0 and indices.stop == len(_worker_state.bodies):
# np.fill_diagonal only works on contiguous memory slices, so we must check the indices.
# This is only true when processing the entire matrix.
np.fill_diagonal(force_magnitudes, 0)
# 4. Calculate the final acceleration vectors.
# We compute the sum of `force_magnitudes * (all_pos - chunk_pos)`
# without storing the full `diffs` array.
# `force_magnitudes @ _worker_state.positions` computes the sum of `F_ij * pos_j`.
# `np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions` computes `sum(F_ij) * pos_i`.
# The difference gives the total acceleration vector.
#
# The formula for acceleration on body `i` is `a_i = Σ G * m_j * (x_j - x_i) / |r_ij|^3`.
# This can be expanded to `a_i = (Σ G*m_j/|r_ij|^3 * x_j) - (Σ G*m_j/|r_ij|^3) * x_i`.
#
# Let `F_ij = G*m_j/|r_ij|^3`. The formula is `a_i = (Σ F_ij * x_j) - (Σ F_ij) * x_i`.
#
# The code implements this as:
# `accel = (force_magnitudes @ _worker_state.bodies.x) - np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions`
#
accel = (force_magnitudes @ _worker_state.bodies.x) - \
np.sum(force_magnitudes, axis=1)[:, np.newaxis] * chunk_positions
return accel
def _init_worker(bodies: Bodies, config: SimulationConfig) -> None:
"""
Initialiser for each worker process in the multiprocessing pool.
This function sets the large NumPy arrays as global variables within each
worker's memory space. This avoids the overhead of pickling and sending
these large arrays for every task.
:param Bodies bodies: The main SoA container with all body data.
:param SimulationConfig config: The simulation configuration object.
"""
_worker_state.bodies = bodies
_worker_state.config = config
# pylint: disable=too-many-instance-attributes
[docs]
class Space:
"""
Class describing a space in which move several bodies.
:ivar float _dt: Model time step (in s).
:ivar float _previous_dt: Previous model time step (in s).
:ivar SimulationConfig _config: The simulation configuration object.
:ivar Bodies _bodies: The main SoA container.
:ivar _ProfilingData _profiling: A container for performance profiling timers.
:ivar float _last_total_energy: The total energy from the previous logged step.
:ivar int _current_iteration: Helper for detailed logging.
:ivar bool _logged_collision_details: Flag to prevent re-logging details in a single frame.
:ivar list[list[int]] _collision_graph: Graph of collisions.
:ivar list[int] _colours: Colours used for spatialised collision handling.
:ivar Pool _pool: A pool of worker processes for parallel computation.
:ivar int _num_processes: The number of worker processes in the pool.
:ivar float _last_total_energy: The total energy from the previous logged step.
:ivar int _current_iteration: Helper for detailed logging.
:ivar bool _logged_collision_details: Helper for detailed logging.
"""
_dt: float
_previous_dt: float
_config: SimulationConfig
_bodies: Bodies
_profiling: _ProfilingData
# Adjacency list for the collision graph
_collision_graph: list[list[int]]
_colours: list[int] # Node colours for graph colouring
_pool: Pool
_num_processes: int
_last_total_energy: float | None
_current_iteration: int
_logged_collision_details: bool
def _generate_non_overlapping_positions(self, n: int, radii: np.ndarray,
rng: np.random.Generator) -> np.ndarray:
"""
Generates `n` non-overlapping positions for bodies with the given radii.
This method uses an iterative vectorised filtering approach, which is significantly
faster than placing bodies one by one. It starts by generating an excess of
candidate positions and then iteratively finds and removes any that overlap.
:param int n: The desired number of positions to generate.
:param np.ndarray radii: An array of radii for each of the `n` bodies.
:param np.random.Generator rng: The random number generator to use.
:returns: An array of `n` non-overlapping positions.
:rtype: np.ndarray
"""
# 1. Start with a number of candidates slightly larger than needed to have a buffer.
num_candidates = int(n * 1.2)
positions = rng.uniform(-100.0, 100.0, (num_candidates, 3))
max_radius = np.max(radii)
# Maximum number of filtering iterations to prevent an infinite loop.
max_iterations: Final[int] = 100
# 2. Iteratively find and replace overlapping positions.
for _ in range(max_iterations):
# --- Broad Phase: Find potential overlaps with a k-d tree ---
tree = cKDTree(positions)
pairs = tree.query_pairs(r=max_radius * 2, p=2.0)
if not pairs:
break # No potential overlaps found, the set is clean.
indices_to_remove = self._get_colliding_indices(
list(pairs), positions, radii, n
)
if indices_to_remove.size == 0:
break # No actual overlaps found, the set is clean.
# --- Filter and Replace ---
positions = np.delete(positions, indices_to_remove, axis=0)
num_needed = num_candidates - len(positions)
new_positions = rng.uniform(-100.0, 100.0, (num_needed, 3))
positions = np.vstack((positions, new_positions))
# 3. Return the first `n` non-overlapping positions.
return positions[:n]
def _get_colliding_indices(self, candidate_pairs: list[tuple[int, int]],
positions: np.ndarray, radii: np.ndarray, n_bodies: int) -> np.ndarray:
"""
Performs a narrow-phase check on candidate pairs to find actual overlaps.
:param list[tuple[int, int]] candidate_pairs: A list of index pairs from the broad phase.
:param np.ndarray positions: The array of all body positions.
:param np.ndarray radii: The array of all body radii.
:param int n_bodies: The total number of bodies.
:returns: An array of unique indices of bodies that are colliding.
:rtype: np.ndarray
"""
p1_indices, p2_indices = np.array(list(candidate_pairs)).T
diffs = positions[p1_indices] - positions[p2_indices]
dists_sq = np.sum(diffs**2, axis=1)
# Use modulo to map candidate indices back to the original body radius indices.
r_sums_sq = (radii[p1_indices % n_bodies] +
radii[p2_indices % n_bodies])**2
colliding_mask = dists_sq < r_sums_sq
return np.unique(p2_indices[colliding_mask])
def _resize_structures(self, new_size: int) -> None:
"""
Resizes internal data structures to a new size.
:param int new_size: The new number of bodies.
"""
self._bodies = Bodies(new_size)
self._collision_graph = [[] for _ in range(new_size)]
self._colours = [-1] * new_size
def _initialize_bodies(self, n: int, dens: float, seed: int) -> None:
"""
Initialises the physical properties and positions of all bodies.
:param int n: Number of bodies.
:param float dens: Body density.
:param int seed: Seed for random number generation.
"""
rng = np.random.default_rng(seed if seed != 0 else None)
# --- 1. Generate Physical Properties (Fully Vectorised) ---
masses = rng.uniform(25.0, 75.0, n)
radii = masses * dens
velocities = rng.uniform(-5.0, 5.0, (n, 3))
positions = self._generate_non_overlapping_positions(n, radii, rng)
# --- 3. Finalize and Emplace Bodies ---
if len(positions) < n:
qWarning(
f"Failed to generate {n} non-overlapping positions. "
f"iterations. The density may be too high. Using {len(positions)} bodies instead."
)
self._resize_structures(len(positions))
final_positions = positions
for i, pos in enumerate(final_positions):
self._bodies.emplace_back(
masses[i], radii[i], pos, velocities[i])
def _warm_up_simulation(self) -> None:
"""
Runs a few "warm-up" iterations to ensure a stable start.
**Problem:** The simulation starts with zero acceleration. The adaptive time-stepper
calculates the `dt` for the current frame based on accelerations from the *previous*
frame. On the first real frame, it would see zero acceleration and incorrectly choose
a large `dt`, potentially causing an unstable "explosion" if bodies start close together.
**Solution:** This method runs the core dynamics calculation a few times. This populates
the acceleration arrays and allows the adaptive `dt` to converge to a safe, stable
value *before* the main simulation loop begins.
"""
# Store the real dt and use a tiny one to kickstart the process safely.
original_dt = self._dt
self._dt = self._previous_dt = self._config.epsilon * 0.1
for _ in range(5):
# This computes forces and updates `self._dt` via the adaptive algorithm.
self.compute_dynamics(0)
# Restore the last calculated stable dt, but cap it at the original initial dt
# to prevent the first visible step from being unexpectedly large.
self._dt = self._previous_dt = min(self._dt, original_dt)
def __init__(self, config: SimulationConfig, initial_dt: float):
"""
Initialises the simulation space.
:param SimulationConfig config: The configuration object for the simulation.
:param float initial_dt: The initial time step for the simulation.
"""
self._config = config
self._dt = initial_dt
self._previous_dt = initial_dt
self._bodies = Bodies(self._config.n_bodies)
self._current_iteration = 0
self._logged_collision_details = False
self._collision_graph = [[] for _ in range(self._config.n_bodies)]
self._colours = [-1] * self._config.n_bodies
self._last_total_energy = None
self._num_processes = cpu_count()
self._initialize_bodies(
self._config.n_bodies, self._config.dens, self._config.seed)
self._profiling = _ProfilingData()
# Initialize with a very high value to avoid false positive on first check
self._last_total_energy = None
# --- Multiprocessing Pool Initialisation ---
# The pool is created *after* initialisation so that child processes can inherit
# the large data arrays via the initialiser.
# R1732 is a false positive here. The pool’s lifecycle is tied to the Space object’s
# lifetime, not a smaller ‘with’ block. Cleanup is handled in the `cleanup()` method.
self._pool = Pool(processes=self._num_processes, initializer=_init_worker, # pylint: disable=consider-using-with
initargs=(self._bodies, self._config))
# --- Adaptive Time-step Warm-up ---
self._warm_up_simulation()
[docs]
def cleanup(self) -> None:
"""
Clean up resources, such as closing the multiprocessing pool.
"""
# Close the pool to prevent any new tasks from being submitted.
self._pool.close()
# Wait for all worker processes to complete their tasks and exit.
self._pool.join()
[docs]
def print_profiling_report(self) -> None:
"""
Prints a formatted report of the time spent in different stages of the simulation
if diagnostics were enabled.
"""
total_time = self._profiling.total()
if total_time < self._config.epsilon:
qDebug("\n--- Profiling Report (No data) ---")
return
report = "\n--- Profiling Report ---\n"
profiling_data = self._profiling.__dict__
for name, time_spent in profiling_data.items():
percentage = time_spent / total_time * 100
report += f"{name:<22} {time_spent:9.4f}s ({percentage:5.1f}%)\n"
report += "-" * 40 + "\n"
report += f"{'Total Instrumented Time':<22} {total_time:9.4f}s"
qDebug(report)
def _calculate_energies(self) -> tuple[float, float, float]:
"""
Computes the total translational kinetic, rotational kinetic, and gravitational
potential energy of the system using vectorised calculations.
:returns: A tuple containing (translational KE, rotational KE, potential energy).
:rtype: tuple[float, float, float]
"""
# Number of bodies in the space.
num_bodies = len(self._bodies)
if num_bodies == 0:
return 0.0, 0.0, 0.0
# --- Kinetic Energies (Vectorised) ---
# Translational KE = 0.5 * m * v^2
# We use einsum for an efficient, single-pass calculation of sum(m_i * v_i^2).
ke_trans = 0.5 * np.einsum('i,ij,ij->', self._bodies.m,
self._bodies.v, self._bodies.v)
# Rotational KE = 0.5 * I * ω^2 = 0.5 * (1/i_inv) * ω^2
# This calculates KE for all bodies. For bodies with i_inv=0, this results
# in `inf * 0`, which is NaN. `nansum` correctly sums only the valid numbers.
omega_sq = np.sum(self._bodies.omega**2, axis=1)
with np.errstate(divide='ignore', invalid='ignore'):
ke_rot = 0.5 * np.nansum(omega_sq / self._bodies.i_inv)
# --- Potential Energy (Vectorised) ---
# Potential Energy U = -G * m1 * m2 / r
# We calculate this for all unique pairs of bodies (where i < j).
# Get upper triangle indices to select unique pairs (i, j) where i < j.
i_indices, j_indices = np.triu_indices(num_bodies, k=1)
# Calculate squared distances for all unique pairs.
dist_vecs = self._bodies.x[j_indices] - self._bodies.x[i_indices]
dist_sq = np.sum(dist_vecs**2, axis=1)
# Calculate the potential energy for all pairs.
# We add a small softening factor (epsilon^2) to the denominator to prevent
# division by zero and to handle close encounters more stably.
inv_dist = 1.0 / np.sqrt(dist_sq + self._config.epsilon**2)
potential = -self._config.universal_g * np.sum(
self._bodies.m[i_indices] * self._bodies.m[j_indices] * inv_dist)
return ke_trans, ke_rot, potential
def _get_effective_mass(self, ctx: CollisionContext, direction_vec: np.ndarray) -> float:
"""
Calculates the effective mass of two colliding bodies in a given direction.
The "effective mass" represents the system's total resistance to acceleration along
the specified direction, accounting for both linear and rotational inertia.
:param CollisionContext ctx: The collision context.
:param np.ndarray direction_vec: The direction vector (e.g., normal or tangent).
:returns: The scalar effective mass.
:rtype: float
"""
inv_mass_sum = ctx.b1.inv_m + ctx.b2.inv_m
# Rotational inertia contribution for body 1: (r1 x n)^2 / I1
term1 = np.dot(np.cross(ctx.r1_vec, direction_vec),
np.cross(ctx.r1_vec, direction_vec)) * ctx.b1.i_inv
# Rotational inertia contribution for body 2: (r2 x n)^2 / I2
term2 = np.dot(np.cross(ctx.r2_vec, direction_vec),
np.cross(ctx.r2_vec, direction_vec)) * ctx.b2.i_inv
return 1.0 / (inv_mass_sum + term1 + term2 + self._config.epsilon)
def _apply_restitution_impulse(self, ctx: CollisionContext) -> tuple[np.ndarray, float]:
"""
Calculates and applies the normal impulse (restitution) for a collision.
:param CollisionContext ctx: The collision context.
:returns: A tuple containing the impulse vector and its magnitude.
:rtype: tuple[np.ndarray, float]
"""
v_rel_normal = np.dot(ctx.v_rel, ctx.n_vec)
effective_mass_n = self._get_effective_mass(ctx, ctx.n_vec)
# --- Baumgarte Stabilisation ---
# Add a "bias" term to the impulse to correct for positional error (penetration).
# This term creates a small impulse that aims to resolve the overlap over the
# next time step, preventing the bodies from sinking into each other.
bias = (self._config.positional_correction_factor /
self._previous_dt) * max(0, ctx.overlap - 0.01)
# Impulse for restitution (velocity change)
jn_vel = -(1.0 + self._config.coeff_restitution) * v_rel_normal
# Impulse for positional correction (bias)
jn_bias = bias
jn = (jn_vel + jn_bias) * effective_mass_n
jn = max(0, jn)
return jn * ctx.n_vec, jn
def _apply_friction_impulse(self, ctx: CollisionContext, jn: float,
impulse_n: np.ndarray) -> np.ndarray:
"""
Calculates and applies the friction impulse for a collision.
:param CollisionContext ctx: The collision context.
:param float jn: The magnitude of the normal impulse, used for the friction limit.
:param np.ndarray impulse_n: The normal impulse vector.
:returns: The friction impulse vector.
:rtype: np.ndarray
"""
# To correctly calculate friction, we must use the relative velocity *after* the
# restitution impulse has been applied.
v_rel_after_restitution = ctx.v_rel + \
impulse_n * (ctx.b1.inv_m + ctx.b2.inv_m)
# Calculate the tangential component of the new relative velocity.
v_t = v_rel_after_restitution - \
np.dot(v_rel_after_restitution, ctx.n_vec) * ctx.n_vec
v_t_norm = np.linalg.norm(v_t)
# If there's no tangential velocity, there's no friction.
if v_t_norm < self._config.epsilon:
return np.zeros(3)
t_vec = v_t / v_t_norm
effective_mass_t = self._get_effective_mass(ctx, t_vec)
# Calculate the magnitude of the impulse required to stop the tangential motion.
# The impulse vector will be in the direction opposite to t_vec.
jt_required = np.dot(v_rel_after_restitution, t_vec) * effective_mass_t
# Clamp the friction impulse to the friction cone (Coulomb's law).
# If the required impulse is within the static friction limit, apply it.
# Otherwise, apply the kinetic friction impulse.
static_friction_limit = self._config.coeff_static_friction * jn
if jt_required < static_friction_limit:
jt = jt_required
else:
kinetic_friction_limit = self._config.coeff_friction * jn
jt = kinetic_friction_limit
# The friction impulse vector opposes the direction of tangential motion.
return -jt * t_vec
def _resolve_collision_pair(self, i: int, j: int) -> None:
"""
Resolves a single collision between bodies `i` and `j`.
This method calculates and applies the necessary impulses to handle restitution
(bounciness) and friction for a single colliding pair.
:param int i: Index of the first body.
:param int j: Index of the second body.
"""
# --- Optimisation: Skip pairs of two immovable bodies ---
# If both bodies have infinite mass, no collision response is possible.
if self._bodies.inv_m[i] == 0 and self._bodies.inv_m[j] == 0:
return
# Create the collision context from the current (potentially overlapping) state.
# This captures the correct relative velocity at the moment of impact.
ctx = CollisionContext.from_bodies(self._bodies[i], self._bodies[j])
if not ctx:
return
# Calculate and apply the normal impulse (restitution + positional correction).
impulse_n, jn = self._apply_restitution_impulse(ctx)
# Only apply friction if there is a non-trivial normal impulse.
if jn > self._config.epsilon:
impulse_t = self._apply_friction_impulse(ctx, jn, impulse_n)
total_impulse = impulse_n + impulse_t
ctx.b1.apply_impulse(-total_impulse,
ctx.r1_vec, self._config.epsilon)
ctx.b2.apply_impulse(
total_impulse, ctx.r2_vec, self._config.epsilon)
def _colour_graph(self) -> int:
"""
Implementation of a greedy graph colouring algorithm to partition the collision graph.
This is a crucial step for safe collision resolution. By assigning a "color" to each
body such that no two adjacent bodies (i.e., two bodies that are colliding) share the
same colour, we can process collisions in stages.
#
**Why this is useful for parallelism (or safe sequential processing):**
All bodies of the same colour form an "independent set," meaning none of them are
colliding with each other. Therefore, all collisions involving bodies of a specific
colour can be processed simultaneously (or in a single sequential pass) without causing
race conditions or conflicting updates. For example, when processing "colour 0" bodies,
# we can handle collisions (0, 5) and (1, 8) at the same time because we know that
body 0 and body 1 are not colliding with each other.
The algorithm iterates through each body, checks the colours of its neighbours in the
collision graph, and assigns the smallest available colour to the current body.
:returns: The total number of colours used (k), which corresponds to the number of
sequential stages needed for collision resolution.
:rtype: int
"""
num_bodies: Final[int] = len(self._bodies)
self._colours = [-1] * num_bodies
if num_bodies == 0:
return 0
# Process nodes in descending order of their degree for a potentially better colouring.
# This is a common heuristic for greedy colouring algorithms.
node_order = np.argsort(
[-len(self._collision_graph[i]) for i in range(num_bodies)])
for i in node_order:
# Find the set of colours used by neighbours of the current node.
neighbour_colours = {self._colours[j] for j in self._collision_graph[i]
if self._colours[j] != -1}
# Find the smallest non-negative integer colour not in use by neighbours.
# This is a fast, set-based way to find the "first available" color.
c = 0
while c in neighbour_colours:
c += 1
self._colours[i] = c
# The number of stages required is one more than the highest colour index used.
return max(self._colours) + 1 if self._colours else 0
def _log_system_energy(self, iteration: int) -> None:
"""
Logs the system's energy at specified intervals if diagnostics are enabled.
:param int iteration: Current iteration number.
"""
# This diagnostic runs periodically to check for energy conservation, a key indicator of
# simulation stability. It should only read data, not modify the simulation state.
if not (iteration > 0 and iteration % self._config.log_freq == 0):
return
ke_trans, ke_rot, potential = self._calculate_energies()
total_ke = ke_trans + ke_rot
current_total_energy = total_ke + potential
qDebug(f"\n--- Iteration {iteration}: System Energy ---")
qDebug(f" Translational KE: {ke_trans:12.4e}")
qDebug(f" Rotational KE: {ke_rot:12.4e}")
qDebug(f" Potential Energy: {potential:12.4e}")
qDebug(f" Total Energy: {current_total_energy:12.4e}")
qDebug(f" Time Step (dt): {self._previous_dt:12.4e}")
# On the first run, just store the energy and return.
if self._last_total_energy is None:
self._last_total_energy = current_total_energy
return
# A significant increase in total energy indicates a numerical instability, often caused
# by the collision response. We log details to help diagnose the issue.
# We allow for a small tolerance to account for floating-point inaccuracies.
if self._last_total_energy is not None and current_total_energy > self._last_total_energy + self._config.epsilon:
warning_msg = f"\n{'=' * 40}\n"
warning_msg += (f" Total energy increased by "
f"{current_total_energy - self._last_total_energy:.4e}\n")
warning_msg += "="*40
qWarning(warning_msg)
# The collision graph holds pairs from the *previous* step, which is what we want
# to inspect as the likely cause of the energy increase.
colliding_pairs = []
for i, neighbours in enumerate(self._collision_graph):
for j in neighbours:
if i < j:
colliding_pairs.append((i, j))
if colliding_pairs:
qWarning(" State of bodies from previous step's collision graph:")
for i, j in colliding_pairs:
b1, b2 = self._bodies[i], self._bodies[j]
qWarning(f" Pair ({i}, {j}):")
qWarning(f" Body {i}: v={b1.v}, omega={b1.omega}")
qWarning(f" Body {j}: v={b2.v}, omega={b2.omega}")
self._last_total_energy = current_total_energy
def _compute_gravity(self, num_bodies: int) -> None:
"""
Computes gravitational forces for all bodies.
:param int num_bodies: Number of bodies to consider.
"""
if num_bodies == 0:
return
epsilon_sq = self._config.epsilon**2
# For small to medium numbers of bodies, the overhead of multiprocessing is not
# worth it. We use a single, fully vectorised calculation which is significantly
# faster than a Python loop.
if num_bodies < 250:
# We must initialize the worker state because the worker function relies on it.
_init_worker(self._bodies, self._config)
self._bodies.a[:num_bodies] = _calculate_force_chunk(
slice(0, num_bodies), self._config.universal_g, epsilon_sq)
else:
# For a large number of bodies, we parallelise the calculation across
# multiple processes to leverage all available CPU cores.
# 1. Create chunks of work for each process.
chunk_size = (num_bodies + self._num_processes -
1) // self._num_processes
chunks = [slice(i, i + chunk_size)
for i in range(0, num_bodies, chunk_size)]
# 2. Use starmap to pass multiple arguments to the worker function.
results = self._pool.starmap(_calculate_force_chunk, [ # type: ignore
(chunk, self._config.universal_g, epsilon_sq) for chunk in chunks])
self._bodies.a[:num_bodies] = np.vstack(results)
def _build_collision_graph(self, num_bodies: int) -> None: # pylint: disable=too-many-locals
"""
Builds the collision graph using a two-stage broad-phase/narrow-phase strategy
to efficiently detect all colliding pairs.
:param int num_bodies: The number of bodies in the simulation.
"""
if num_bodies < 2:
return
# 1. Clear previous collision graph data.
for neighbors in self._collision_graph:
neighbors.clear()
positions = self._bodies.x[:num_bodies]
radii = self._bodies.r[:num_bodies]
# --- Broad-Phase Collision Detection ---
# The goal of the broad phase is to quickly eliminate pairs of bodies that are
# definitely not colliding, creating a smaller list of "candidate" pairs for the
# more expensive narrow-phase check. We use a k-d tree.
# 1. Build two k-d trees on the body positions for efficient spatial queries.
# Using two trees is a common pattern for this type of query.
tree = cKDTree(positions)
# 2. Query the k-d tree to find all potentially colliding pairs.
# `query_pairs` finds all pairs of points whose distance is at most `r`.
# We set `r` to the diameter of the largest possible body to ensure we don't
# miss any potential collisions. `p=2.0` specifies Euclidean distance.
max_radius = np.max(radii) if num_bodies > 0 else 0
# This returns a set of tuples, which we convert to a list of lists for processing.
# We convert the set to a list for easier indexing.
candidate_pairs = list(tree.query_pairs(r=max_radius * 2, p=2.0))
# --- Narrow-Phase Collision Detection ---
# For the candidate pairs from the broad phase, perform a precise intersection test.
# This is done in a vectorised manner for performance.
if candidate_pairs:
# Extract the indices of the first and second body in each pair.
# `query_pairs` guarantees i < j for each pair (i, j).
indices_i, indices_j = np.array(candidate_pairs).T
# Perform the narrow-phase check for all candidate pairs at once.
# 1. Calculate the vector difference between the positions of each pair.
diffs = positions[indices_j] - positions[indices_i]
# 2. Calculate the squared distance for each pair.
dists_sq = np.sum(diffs**2, axis=1)
# 3. Calculate the squared sum of radii for each pair.
r_sums_sq = (radii[indices_i] + radii[indices_j])**2
# 4. Find the pairs that are actually colliding.
# We use itertools.compress for a memory-efficient way to iterate only
# over the pairs that satisfy the collision mask.
colliding_mask = dists_sq < r_sums_sq
for i, j in compress(candidate_pairs, colliding_mask):
# If they are colliding, add an edge to the graph for both nodes.
self._collision_graph[i].append(j)
self._collision_graph[j].append(i)
def _resolve_all_collisions(self, num_bodies: int) -> None:
"""
Resolves all collisions for the current step by colouring the collision
graph and handling collisions in parallelizable stages.
:param int num_bodies: The number of bodies in the simulation.
"""
# 1. Colour the collision graph to partition colliding bodies into independent sets.
# This returns the total number of colours (k) needed.
num_colours: Final[int] = self._colour_graph()
# 2. Process collisions in `k` stages, one for each colour.
# This loop is the key to safe collision resolution. By handling all collisions
# for one colour at a time, we ensure that we never simultaneously process two
# collisions that share a body (e.g., A-B and A-C), which would lead to incorrect
# results and race conditions.
for k in range(num_colours):
# Find all bodies that belong to the current colour stage.
# We iterate through these bodies and resolve their collisions.
# The collision response is complex and data-dependent, making it poorly suited
# for parallelization due to high overhead. We perform it sequentially here.
for i in range(num_bodies):
if self._colours[i] != k:
continue
for j in self._collision_graph[i]:
# To process each pair only once, we only handle pairs where i < j.
if i < j:
self._resolve_collision_pair(i, j)
def _update_adaptive_dt(self) -> None:
"""
Calculates a stable time step `dt` for the current frame based on the maximum
acceleration from the *previous* frame.
This makes the time step responsive to the simulation's dynamics, ensuring stability
during high-acceleration events (like close encounters) while allowing for efficiency
when bodies are far apart.
"""
num_bodies = len(self._bodies)
if num_bodies == 0:
self._dt = self._config.max_dt
return
# We work with squared values as long as possible to improve numerical stability
# and avoid unnecessary square root operations.
max_dt_sq = self._config.max_dt**2
# --- 1. Acceleration-Based Constraint (Prevent Large Positional Errors) ---
max_accel_sq = np.max(np.sum(self._bodies.a**2, axis=1))
dt_a_sq: float
if max_accel_sq > self._config.epsilon**2:
# This constraint is derived from the kinematic equation for displacement under
# constant acceleration: d = v₀t + 0.5at².
# Assuming the body starts from rest (v₀=0) for simplicity, we get d = 0.5at².
# We want to find the time `t` (our dt) it would take for the fastest-accelerating
# body to travel a small, fixed distance `TARGET_DX`.
#
# Solving for t² gives: t² = 2d / a
#
# We use this `t` as our target time step, ensuring that no body moves too far
# in a single step due to high acceleration.
dt_a_sq = 2.0 * self._config.target_dx / math.sqrt(max_accel_sq)
else:
dt_a_sq = max_dt_sq
# --- 2. Velocity-Based Constraint (Prevent Tunneling, CFL Condition) ---
# This is a Courant-Friedrichs-Lewy (CFL) condition, ensuring a body does not
# travel more than a fraction of its own radius in a single step.
vel_sq = np.sum(self._bodies.v**2, axis=1)
max_vel_sq = np.max(vel_sq)
dt_v_sq: float
if max_vel_sq > self._config.epsilon**2:
# This constraint prevents a fast-moving body from "tunnelling" through another
# body in a single time step. The heuristic states that a body should not travel
# more than a fraction of its own size in one step.
#
# The condition is: v * dt < C * radius (where C is a safety factor, e.g., 0.5)
#
# Solving for dt gives: dt < (C * radius) / v
# Squaring both sides gives: dt² < (C * radius)² / v²
#
# We use the radius of the fastest-moving body for this calculation.
fastest_body_radius = self._bodies.r[np.argmax(vel_sq)]
dt_v_sq = (0.5 * fastest_body_radius)**2 / max_vel_sq
else:
dt_v_sq = max_dt_sq
# --- 3. Combine Constraints and Apply Damping ---
# The final target dt is the square root of the minimum of the squared constraints.
target_dt = math.sqrt(
min(max_dt_sq, dt_a_sq, dt_v_sq) + self._config.epsilon)
# Smooth the change in dt using an exponential moving average (a low-pass filter)
# to prevent rapid oscillations in the time step.
self._dt = self._dt * (1 - self._config.dt_damping_factor) + \
target_dt * self._config.dt_damping_factor
[docs]
def compute_dynamics(self, iteration: int) -> None:
"""
Computes one step of the simulation, including dynamics and collisions.
:param int iteration: Current iteration.
"""
# ======================================================================================
# --- 1. SETUP & LOGGING ---
# ======================================================================================
self._current_iteration = iteration # Store iteration for detailed logging
self._log_system_energy(iteration)
# Number of bodies in the space.
num_bodies: Final[int] = len(self._bodies)
# ======================================================================================
# --- 2. ADAPTIVE TIME-STEPPING ---
# ======================================================================================
self._update_adaptive_dt()
# ======================================================================================
# --- 3. VELOCITY VERLET INTEGRATION ---
# ======================================================================================
# The Velocity Verlet algorithm is split into two parts around the force calculation.
# This structure provides better numerical stability than a simple Euler integration.
start_time = time.perf_counter()
# --- Part 1: Update positions and half-step velocities ---
# This first integration step should be based entirely on the state at the beginning
# of the frame, which means using the time step from the previous frame (`_previous_dt`).
self._bodies.integrate_part1(self._previous_dt)
self._profiling.integration += time.perf_counter() - start_time
# ======================================================================================
# --- 4. FORCE CALCULATION & COLLISION HANDLING ---
# ======================================================================================
# Now that bodies are in their new positions, we calculate all forces and handle
# interactions.
start_time = time.perf_counter()
# Find all pairs of colliding bodies.
self._build_collision_graph(num_bodies)
self._profiling.collision_detection += time.perf_counter() - start_time
start_time = time.perf_counter()
# --- Collision Response ---
# Resolve the detected collisions by applying impulses and positional corrections.
self._resolve_all_collisions(num_bodies)
self._profiling.collision_response += time.perf_counter() - start_time
start_time = time.perf_counter()
# Reset accelerations
self._bodies.a.fill(0)
self._bodies.torque.fill(0)
self._bodies.alpha.fill(0)
# --- Gravitational Forces ---
# Calculate the gravitational acceleration on each body.
self._compute_gravity(num_bodies)
self._profiling.force_calculation += time.perf_counter() - start_time
# --- Convert Torque to Angular Acceleration ---
# alpha = torque * I_inv. We do this after all torques for the frame are accumulated.
self._bodies.alpha[:] = self._bodies.torque * \
self._bodies.i_inv[:, np.newaxis]
# --- Part 2: Complete the velocity integration for the CURRENT step ---
# To maintain stability with an adaptive timestep, the second half of the velocity
# update must use the same timestep as the first half.
self._bodies.integrate_part2(self._previous_dt)
for _, b in enumerate(self._bodies):
b.dampen_velocity(self._config.linear_damping,
self._config.angular_damping)
# ======================================================================================
# --- 5. CLEANUP ---
# ======================================================================================
# Store the dt we just used for this frame's integration for the energy log.
self._previous_dt = self._dt
@property
def bodies(self) -> list[BodyProxy]:
"""
Provides an AoS-like list of body proxies for the display controller.
:returns: List of body proxies.
:rtype: list[BodyProxy]
"""
return [self._bodies[i] for i in range(len(self._bodies))]
[docs]
def get_body_data_for_display(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Provides current simulation data required for display as NumPy arrays.
This is optimised for efficient data transfer to the UI thread.
:returns: A tuple containing (positions, quaternions, torques, angular_accelerations).
:rtype: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
"""
n = len(self._bodies) # Use len(self._bodies) directly
return (self._bodies.x[:n], self._bodies.q[:n],
self._bodies.torque[:n], self._bodies.alpha[:n])