#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Simple interaction model with several bodies.
:module: space
:author: Le Bars, Yoann
This module is part of the pure Python benchmark.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version. See file `LICENSE` or go to:
https://www.gnu.org/licenses/gpl-3.0.html
"""
# Defines the public names of this module.
__all__ = ['Space']
from typing import Final
import time
import math
from dataclasses import dataclass
from multiprocessing import Pool, cpu_count
from scipy.spatial import cKDTree
import numpy as np
from PySide6.QtCore import qWarning, qDebug
from src.body import Bodies, BodyProxy
from src import constants
@dataclass
class CollisionContext:
"""
A data container to hold all relevant information for a single collision event.
This simplifies passing collision data between functions.
"""
b1: BodyProxy
b2: BodyProxy
n_vec: np.ndarray
r1_vec: np.ndarray
r2_vec: np.ndarray
v_rel: np.ndarray
overlap: float
@classmethod
def from_bodies(cls, b1: BodyProxy, b2: BodyProxy) -> 'CollisionContext | None':
"""
Factory method to create a CollisionContext if two bodies are colliding.
:param BodyProxy b1: First body.
:param BodyProxy b2: Second body.
:returns: A CollisionContext object if a collision occurs, otherwise None.
:rtype: CollisionContext | None
"""
d_vec = b2.x - b1.x
dist_sq = np.dot(d_vec, d_vec)
r_sum = b1.r + b2.r
if dist_sq >= r_sum**2 or dist_sq == 0:
return None
dist = math.sqrt(dist_sq)
n_vec = d_vec / dist
r1_vec = -b1.r * n_vec
r2_vec = b2.r * n_vec
v_rel = (b2.v + np.cross(b2.omega, r2_vec)) - \
(b1.v + np.cross(b1.omega, r1_vec))
overlap = (b1.r + b2.r) - dist
return cls(b1, b2, n_vec, r1_vec, r2_vec, v_rel, overlap)
def _calculate_force_chunk(args: tuple) -> np.ndarray:
"""
Worker function to calculate gravitational forces for a subset of bodies using a fully
vectorized approach. This is a top-level function as required by `multiprocessing`.
The function calculates the gravitational acceleration exerted by all bodies in the
simulation on a specific subset (chunk) of bodies.
:param tuple args: A tuple containing the following arguments:
- **indices** (*slice*): A slice object defining the subset of bodies
to calculate forces for.
- **positions** (*np.ndarray*): Array of positions for all bodies.
Shape: `(num_bodies, 3)`.
- **masses** (*np.ndarray*): Array of masses for all bodies.
Shape: `(num_bodies,)`.
- **G** (*float*): The universal gravitational constant.
- **epsilon_sq** (*float*): The squared softening factor to prevent
singularities.
:returns: The calculated accelerations for the given indices.
:rtype: np.ndarray
"""
# Unpack arguments for clarity.
indices, positions, masses, G, epsilon_sq = args # pylint: disable=invalid-name
# `diffs`: 3D array of position difference vectors.
# Shape: (num_bodies_in_chunk, total_num_bodies, 3)
diffs = positions - positions[indices, np.newaxis, :]
# `dists_sq`: 2D array of squared distances.
# We use `out=...` with `einsum` to avoid creating an intermediate `diffs**2` array.
# Shape: (num_bodies_in_chunk, total_num_bodies)
dists_sq = np.einsum('ijk,ijk->ij', diffs, diffs)
# Apply softening factor to avoid division by zero for bodies at the same position.
# This also prevents taking the square root of zero.
dists_sq[dists_sq < epsilon_sq] = epsilon_sq
# Calculate 1 / distance^3 in-place to save memory.
dists_sq **= -1.5
# `force_magnitudes` now contains `G * m_j / d_ij^3` for each pair.
force_magnitudes = G * masses * dists_sq
# Use einsum to calculate the sum of acceleration vectors. This is equivalent to
# `np.sum(force_magnitudes[:, :, np.newaxis] * diffs, axis=1)` but can be faster.
return np.einsum('ij,ijk->ik', force_magnitudes, diffs)
# pylint: disable=too-many-instance-attributes
[docs]
class Space:
"""
Class describing a space in which move several bodies.
:ivar float __dt: Model time step (in s).
:ivar float __previous_dt: Previous model time step (in s).
:ivar float __G: Universal gravity constant (in m³/kg/s²).
:ivar float __epsilon: Computing precision.
:ivar Bodies _bodies: The main SoA container.
:ivar list[list[int]] _collision_graph: Graph of collisions.
:ivar list[int] _colours: Colours used for spatialised collision handling.
:ivar bool __diagnostics: Whether to enable performance profiling.
:ivar Pool _pool: A pool of worker processes for parallel computation.
:ivar float _last_total_energy: The total energy from the previous logged step.
:ivar int _current_iteration: Helper for detailed logging.
:ivar bool _logged_collision_details: Helper for detailed logging.
"""
__dt: float
__previous_dt: float
__G: float
__epsilon: float
_bodies: Bodies
# Adjacency list for the collision graph
_collision_graph: list[list[int]]
_colours: list[int] # Node colours for graph colouring
_pool: Pool
_last_total_energy: float
_prof_integration: float
_prof_collision_detection: float
_prof_collision_response: float
_prof_force_calculation: float
_current_iteration: int
_logged_collision_details: bool
def _initialize_bodies(self, n: int, dens: float, seed: int) -> None: # pylint: disable=too-many-locals
"""
Actual initialisation of the bodies.
:param int n: Number of bodies.
:param float dens: Bodies density.
:param int seed: Seed for random number generation.
"""
# Use NumPy’s random generator for vectorized operations
rng = np.random.default_rng(seed if seed != 0 else None)
# Generate masses and radii for all bodies at once
masses = rng.uniform(25.0, 75.0, n)
radii = masses * dens
# Generate initial velocities for all bodies at once
velocities = rng.uniform(-5.0, 5.0, (n, 3))
# Maximum number of tries to position a body without overlap
max_retries: Final[int] = 100
# Place bodies one by one, but with an optimized overlap check
for i in range(n):
# Mass, radius, and initial velocity for the current body.
m, r, v0 = masses[i], radii[i], velocities[i]
# Current body initial position.
x0: np.ndarray
placed = False
for _ in range(max_retries):
# Generate a new tentative position
x0_try = rng.uniform(-100.0, 100.0, 3)
x0 = x0_try # Store the last attempt
# If it is the first body, no need to check for overlap
if i == 0:
x0 = x0_try
placed = True
break
# Vectorized overlap check against all previously placed bodies.
# This is much faster than iterating through each placed body.
# `diffs` is an array of vectors from each existing body to the new
# tentative position. NumPy broadcasting handles this efficiently.
# Shape: (i, 3) where i is the number of bodies already placed.
diffs = x0_try - self._bodies.x[:i]
# `dists_sq` is an array of squared distances to each existing body.
dists_sq = np.sum(diffs**2, axis=1)
# `r_sums_sq` is an array of the squared sum of radii (the minimum
# distance allowed to avoid overlap).
r_sums_sq = (r + self._bodies.r[:i])**2
# `np.any()` checks if any squared distance is less than the required
# minimum squared distance. If no overlaps are found, we accept the
# position and break the retry loop.
if not np.any(dists_sq < r_sums_sq):
placed = True
break
if not placed:
qWarning(
f"Could not place body {i} without overlap after "
f"{max_retries} attempts. Placing it at the last tried position, "
"which may cause initial instability."
)
self._bodies.emplace_back(m, r, x0, v0)
def __init__(self, args: 'CmdLineArgs') -> None:
"""
Class initialiser.
:param CmdLineArgs args: The command-line arguments containing simulation parameters.
"""
self.__dt = args.dt
self.__previous_dt = args.dt
self.__G = args.universal_g
self.__epsilon = args.epsilon if args.epsilon > 0. else constants.EPSILON
self._bodies = Bodies(args.n_bodies)
self._current_iteration = 0
self._logged_collision_details = False
self._collision_graph = [[] for _ in range(args.n_bodies)]
self._colours = [-1] * args.n_bodies
self._last_total_energy = 0.0
# --- Multiprocessing Pool Initialization ---
# R1732 is a false positive here. The pool's lifecycle is tied to the Space object's
# lifetime, not a smaller 'with' block. Cleanup is handled in the cleanup() method.
self._pool = Pool(processes=cpu_count() # pylint: disable=consider-using-with
)
self._initialize_bodies(args.n_bodies, args.dens, args.seed)
self._prof_integration = 0.0
self._prof_collision_detection = 0.0
self._prof_collision_response = 0.0
self._prof_force_calculation = 0.0
# Initialize with a very high value to avoid false positive on first check
self._last_total_energy = float('inf')
# --- Adaptive Time-step Warm-up ---
# The simulation starts with zero acceleration. If we used the default `dt`, the first
# step might be too large if bodies are close, causing an instability. To prevent this,
# we perform a few "warm-up" iterations with a very small `dt`.
# This allows the system to calculate initial gravitational forces and accelerations,
# and lets the adaptive time-stepper converge on a safe, stable `dt` before the
# actual simulation begins.
# We use a tiny initial dt to kickstart the process.
self.__dt = self.__previous_dt = self.__epsilon
# Run a few steps of the dynamics calculation.
for _ in range(5):
# This computes forces and updates `self.__dt` via the adaptive algorithm.
self.compute_dynamics(0)
[docs]
def cleanup(self) -> None:
"""
Clean up resources, such as closing the multiprocessing pool.
"""
# Close the pool to prevent any new tasks from being submitted.
self._pool.close()
# Wait for all worker processes to complete their tasks and exit.
self._pool.join()
[docs]
def print_profiling_report(self) -> None:
"""
Prints a formatted report of the time spent in different stages of the simulation
if diagnostics were enabled.
"""
profiling_data = {
"Integration": self._prof_integration,
"Force Calculation": self._prof_force_calculation,
"Collision Detection": self._prof_collision_detection,
"Collision Response": self._prof_collision_response,
}
total_time = sum(profiling_data.values())
if total_time < constants.EPSILON:
qDebug("\n--- Profiling Report (No data) ---")
return
report = "\n--- Profiling Report ---\n"
for name, time_spent in profiling_data.items():
percentage = time_spent / total_time * 100
report += f"{name:<22} {time_spent:9.4f}s ({percentage:5.1f}%)\n"
report += "-" * 40 + "\n"
report += f"{'Total Instrumented Time':<22} {total_time:9.4f}s"
qDebug(report)
def _calculate_energies(self) -> tuple[float, float, float]: # pylint: disable=too-many-locals
"""
Computes the total translational kinetic, rotational kinetic, and gravitational
potential energy of the system using vectorized calculations.
:returns: A tuple containing (translational KE, rotational KE, potential energy).
:rtype: tuple[float, float, float]
"""
# Number of bodies in the space.
num_bodies = len(self._bodies)
if num_bodies == 0:
return 0.0, 0.0, 0.0
# --- Kinetic Energies (Vectorized) ---
# Translational KE = 0.5 * m * v^2
# We use einsum for an efficient, single-pass calculation of sum(m_i * v_i^2).
ke_trans = 0.5 * np.einsum('i,ij,ij->', self._bodies.m,
self._bodies.v, self._bodies.v)
# Rotational KE = 0.5 * I * ω^2
# We only consider bodies with a valid moment of inertia (i_inv > 0).
valid_inertia_mask = self._bodies.i_inv > 0
inertia = np.zeros_like(self._bodies.i_inv)
inertia[valid_inertia_mask] = 1.0 / \
self._bodies.i_inv[valid_inertia_mask]
omega_sq = np.sum(self._bodies.omega**2, axis=1)
ke_rot = 0.5 * np.sum(inertia * omega_sq)
# --- Potential Energy (Vectorized) ---
# Potential Energy U = -G * m1 * m2 / r
# We calculate this for all unique pairs of bodies.
# Get upper triangle indices to select unique pairs (i, j) where i < j.
i_indices, j_indices = np.triu_indices(num_bodies, k=1)
# Calculate distance vectors and squared distances for all pairs at once.
dist_vecs = self._bodies.x[j_indices] - self._bodies.x[i_indices]
dist_sq = np.sum(dist_vecs**2, axis=1)
# Filter out pairs that are too close to avoid division by zero.
valid_dist_mask = dist_sq > self.__epsilon**2
dist = np.sqrt(dist_sq[valid_dist_mask])
# Calculate potential energy only for valid pairs and sum them up.
m1 = self._bodies.m[i_indices][valid_dist_mask]
m2 = self._bodies.m[j_indices][valid_dist_mask]
potential = -self.__G * np.sum((m1 * m2) / dist)
return ke_trans, ke_rot, potential
def _resolve_interpenetration(self, b1: BodyProxy, b2: BodyProxy, overlap: float,
n_vec: np.ndarray) -> None:
"""
Resolves interpenetration by moving bodies apart along the collision normal.
**Why this is important for stability:**
Due to discrete time steps in the simulation, bodies can slightly overlap (interpenetrate)
before a collision is detected. If this overlap is not corrected, bodies can sink into
each other, causing the collision response to apply massive, unrealistic forces in
subsequent frames. This leads to numerical instability and "explosions" where energy is
not conserved.
This method applies a direct positional correction to move the bodies so they are just
touching. This prevents the sinking problem and ensures that the impulse-based velocity
response is calculated from a valid, non-overlapping state, which is crucial for a
stable simulation.
:param BodyProxy b1: First body.
:param BodyProxy b2: Second body.
:param float overlap: The amount of overlap.
:param np.ndarray n_vec: The collision normal vector.
"""
# The total correction is distributed between the two bodies based on their inverse mass.
# This means lighter bodies move more, and immovable bodies (inv_m=0) don't move at all.
total_inv_mass = b1.inv_m + b2.inv_m
if total_inv_mass < self.__epsilon:
return
# A small percentage of the overlap is corrected to avoid jittering.
percent = 0.8 # 80% correction
correction = (overlap * percent / total_inv_mass) * n_vec
if b1.inv_m > 0:
self._bodies.x[b1._index] -= correction * \
b1.inv_m # pylint: disable=protected-access
if b2.inv_m > 0:
self._bodies.x[b2._index] += correction * \
b2.inv_m # pylint: disable=protected-access
def _get_effective_mass(self, ctx: CollisionContext, direction_vec: np.ndarray) -> float:
"""
Calculates the effective mass of two colliding bodies in a given direction.
The "effective mass" represents the system's total resistance to acceleration along
the specified direction, accounting for both linear and rotational inertia.
:param CollisionContext ctx: The collision context.
:param np.ndarray direction_vec: The direction vector (e.g., normal or tangent).
:returns: The scalar effective mass.
:rtype: float
"""
inv_mass_sum = ctx.b1.inv_m + ctx.b2.inv_m
# Rotational inertia contribution for body 1: (r1 x n)^2 / I1
term1 = np.dot(np.cross(ctx.r1_vec, direction_vec),
np.cross(ctx.r1_vec, direction_vec)) * ctx.b1.i_inv
# Rotational inertia contribution for body 2: (r2 x n)^2 / I2
term2 = np.dot(np.cross(ctx.r2_vec, direction_vec),
np.cross(ctx.r2_vec, direction_vec)) * ctx.b2.i_inv
return 1.0 / (inv_mass_sum + term1 + term2)
def _apply_restitution_impulse(self, ctx: CollisionContext) -> tuple[np.ndarray, float]:
"""
Calculates and applies the normal impulse (restitution) for a collision.
:param CollisionContext ctx: The collision context.
:returns: A tuple containing the impulse vector and its magnitude.
:rtype: tuple[np.ndarray, float]
"""
v_rel_normal = np.dot(ctx.v_rel, ctx.n_vec)
if v_rel_normal > 0:
# Bodies are moving apart, no impulse needed.
return np.zeros(3), 0.0
effective_mass_n = self._get_effective_mass(ctx, ctx.n_vec)
jn = -(1.0 + constants.COEFF_RESTITUTION) * \
v_rel_normal * effective_mass_n
jn = max(0, jn)
return jn * ctx.n_vec, jn
def _apply_friction_impulse(self, ctx: CollisionContext, jn: float,
impulse_n: np.ndarray) -> np.ndarray:
"""
Calculates and applies the friction impulse for a collision.
:param CollisionContext ctx: The collision context.
:param float jn: The magnitude of the normal impulse, used for the friction limit.
:param np.ndarray impulse_n: The normal impulse vector.
:returns: The friction impulse vector.
:rtype: np.ndarray
"""
# Calculate the relative velocity in the tangent plane.
v_t = ctx.v_rel - np.dot(ctx.v_rel, ctx.n_vec) * ctx.n_vec
v_t_norm = np.linalg.norm(v_t)
# If there's no tangential velocity, there's no friction.
if v_t_norm < self.__epsilon:
return np.zeros(3)
t_vec = v_t / v_t_norm
effective_mass_t = self._get_effective_mass(ctx, t_vec)
# Calculate the impulse required to stop tangential motion.
jt = -np.dot(ctx.v_rel, t_vec) * effective_mass_t
# Clamp the friction impulse to the friction cone (Coulomb's law).
static_friction_limit = constants.COEFF_STATIC_FRICTION * jn
jt = np.clip(jt, -static_friction_limit, static_friction_limit)
return jt * t_vec
def _handle_collision(self, b1: BodyProxy, b2: BodyProxy) -> None:
"""
Handle a collision between two bodies.
:param BodyProxy b1: First body.
:param BodyProxy b2: Second body.
"""
ctx = CollisionContext.from_bodies(b1, b2)
if not ctx:
return
self._resolve_interpenetration(ctx.b1, ctx.b2, ctx.overlap, ctx.n_vec)
impulse_n, jn = self._apply_restitution_impulse(ctx)
if jn > 0:
impulse_t = self._apply_friction_impulse(ctx, jn, impulse_n)
total_impulse = impulse_n + impulse_t
ctx.b1.apply_impulse(-total_impulse, ctx.r1_vec, self.__epsilon)
ctx.b2.apply_impulse(total_impulse, ctx.r2_vec, self.__epsilon)
def _color_graph(self) -> int:
"""
Implementation of a greedy graph colouring algorithm to partition the collision graph.
This is a crucial step for safe collision resolution. By assigning a "color" to each
body such that no two adjacent bodies (i.e., two bodies that are colliding) share the
same colour, we can process collisions in stages.
**Why this is useful for parallelism (or safe sequential processing):**
All bodies of the same colour form an "independent set," meaning none of them are
colliding with each other. Therefore, all collisions involving bodies of a specific
colour can be processed simultaneously (or in a single sequential pass) without causing
race conditions or conflicting updates. For example, when processing "color 0" bodies,
we can handle collisions (0, 5) and (1, 8) at the same time, because we know that
body 0 and body 1 are not colliding with each other.
The algorithm iterates through each body, checks the colors of its neighbors in the
collision graph, and assigns the smallest available colour to the current body.
:returns: The total number of colours used (k), which corresponds to the number of
sequential stages needed for collision resolution.
:rtype: int
"""
# Number of bodies in the space.
num_bodies: Final[int] = len(self._bodies)
# Initialise all colours to -1 (uncoloured).
self._colours = [-1] * num_bodies
# Keep track of the highest colour index used so far.
max_colour = 0
# Iterate through each body (node) in the graph.
for i, neighbors in enumerate(self._collision_graph):
# `used_colours` is a boolean array to mark which colours are already taken by
# the neighbours of the current body `i`. The size is `max_colour + 2` to have
# indices from 0 to `max_colour` and one extra slot for the next potential new colour.
used_colours = [False] * (max_colour + 2)
for neighbor_idx in neighbors:
if self._colours[neighbor_idx] != -1:
# Mark the neighbour’s colour as used.
used_colours[self._colours[neighbor_idx]] = True
# Find the first available colour (the smallest integer not used by neighbours).
colour = 0
while colour < len(used_colours) and used_colours[colour]:
colour += 1
# Assign the found colour to the current body.
self._colours[i] = colour
if colour > max_colour:
max_colour = max(max_colour, colour)
return max_colour + 1
def _log_system_energy(self, iteration: int) -> None:
"""
Logs the system's energy at specified intervals if diagnostics are enabled.
:param int iteration: Current iteration number.
"""
# This diagnostic runs periodically to check for energy conservation, a key indicator of
# simulation stability. It should only read data, not modify the simulation state.
if not (iteration > 0 and iteration % 100 == 0):
return
ke_trans, ke_rot, potential = self._calculate_energies()
total_ke = ke_trans + ke_rot
current_total_energy = total_ke + potential
qDebug(f"\n--- Iteration {iteration}: System Energy ---")
qDebug(f" Translational KE: {ke_trans:12.4e}")
qDebug(f" Rotational KE: {ke_rot:12.4e}")
qDebug(f" Potential Energy: {potential:12.4e}")
qDebug(f" Total Energy: {current_total_energy:12.4e}")
qDebug(f" Time Step (dt): {self.__previous_dt:12.4e}")
# A significant increase in total energy indicates a numerical instability, often caused
# by the collision response. We log details to help diagnose the issue.
# We allow for a small tolerance to account for floating-point inaccuracies.
if current_total_energy > self._last_total_energy + self.__epsilon:
warning_msg = "="*40 + "\n"
warning_msg += (f" Total energy increased by "
f"{current_total_energy - self._last_total_energy:.4e}\n")
warning_msg += "="*40
qWarning(warning_msg)
# The collision graph holds pairs from the *previous* step, which is what we want
# to inspect as the likely cause of the energy increase.
colliding_pairs = []
for i, neighbors in enumerate(self._collision_graph):
for j in neighbors:
if i < j:
colliding_pairs.append((i, j))
if colliding_pairs:
qWarning(" State of bodies from previous step's collision graph:")
for i, j in colliding_pairs:
b1, b2 = self._bodies[i], self._bodies[j]
qWarning(f" Pair ({i}, {j}):")
qWarning(f" Body {i}: v={b1.v}, omega={b1.omega}")
qWarning(f" Body {j}: v={b2.v}, omega={b2.omega}")
self._last_total_energy = current_total_energy
def _compute_gravity(self, num_bodies: int) -> None:
"""
Computes gravitational forces for all bodies.
:param int num_bodies: Number of bodies to consider.
"""
if num_bodies > 0:
# Positions of all bodies.
positions = self._bodies.x[:num_bodies]
# Masses of all bodies.
masses = self._bodies.m[:num_bodies]
# Squared softening factor to avoid singularities.
epsilon_sq = self.__epsilon**2
# Create chunks of work for each process
# pylint: disable=protected-access
# Size of each chunk of work for the multiprocessing pool.
chunk_size = (num_bodies + self._pool._processes -
1) // self._pool._processes
# List of tuples, where each tuple contains the arguments for `_calculate_force_chunk`.
chunks = [
(slice(i, i + chunk_size), positions, masses, self.__G, epsilon_sq)
for i in range(0, num_bodies, chunk_size)
]
# List of acceleration arrays returned by the worker processes.
results = self._pool.map(_calculate_force_chunk, chunks)
self._bodies.a[:num_bodies] = np.vstack(results)
def _build_collision_graph(self, num_bodies: int) -> None: # pylint: disable=too-many-locals
"""
Builds the collision graph using a two-stage broad-phase/narrow-phase strategy
to efficiently detect all colliding pairs.
:param int num_bodies: The number of bodies in the simulation.
"""
if num_bodies < 2:
return
# 1. Clear previous collision graph data.
for neighbors in self._collision_graph:
neighbors.clear()
positions = self._bodies.x[:num_bodies]
radii = self._bodies.r[:num_bodies]
# --- Broad-Phase Collision Detection ---
# The goal of the broad phase is to quickly eliminate pairs of bodies that are
# definitely not colliding, creating a smaller list of "candidate" pairs for the
# more expensive narrow-phase check. We use a k-d tree.
# 1. Build two k-d trees on the body positions for efficient spatial queries.
# Using two trees is a common pattern for this type of query.
tree = cKDTree(positions)
# 2. Query the k-d tree to find all potentially colliding pairs.
# `query_pairs` finds all pairs of points whose distance is at most `r`.
# We set `r` to the diameter of the largest possible body to ensure we don't
# miss any potential collisions. `p=2.0` specifies Euclidean distance.
max_radius = np.max(radii) if num_bodies > 0 else 0
# This returns a set of tuples, which we convert to a list of lists for processing.
# We convert the set to a list for easier indexing.
candidate_pairs = list(tree.query_pairs(r=max_radius * 2, p=2.0))
# --- Narrow-Phase Collision Detection ---
# For the candidate pairs from the broad phase, perform a precise intersection test.
# This is done in a vectorized manner for performance.
if candidate_pairs:
# Extract the indices of the first and second body in each pair.
# `query_pairs` guarantees i < j for each pair (i, j).
indices_i, indices_j = np.array(candidate_pairs).T
# Perform the narrow-phase check for all candidate pairs at once.
# 1. Calculate the vector difference between the positions of each pair.
diffs = positions[indices_j] - positions[indices_i]
# 2. Calculate the squared distance for each pair.
dists_sq = np.sum(diffs**2, axis=1)
# 3. Calculate the squared sum of radii for each pair.
r_sums_sq = (radii[indices_i] + radii[indices_j])**2
# 4. Find the pairs that are actually colliding.
colliding_mask = dists_sq < r_sums_sq
for i, j in np.array(candidate_pairs)[colliding_mask]:
# If they are colliding, add an e
self._collision_graph[i].append(j)
self._collision_graph[j].append(i)
def _resolve_all_collisions(self, num_bodies: int) -> None:
"""
Resolves all collisions for the current step by colouring the collision
graph and handling collisions in parallelizable stages.
:param int num_bodies: The number of bodies in the simulation.
"""
# 1. Colour the collision graph to partition colliding bodies into independent sets.
# This returns the total number of colours (k) needed.
num_colours: Final[int] = self._color_graph()
# 2. Process collisions in k stages, one for each colour.
# This loop is the key to safe collision resolution. By handling all collisions
# for one colour at a time, we ensure that we never simultaneously process two
# collisions that share a body (e.g., A-B and A-C), which would lead to incorrect
# results and race conditions.
for c in range(num_colours):
# In this stage, we find all bodies `i` that have the current colour `c`.
for i in range(num_bodies):
if self._colours[i] != c:
continue
# For each body `i` of colour `c`, we resolve its collisions with all its
# neighbours `j`. Because of the graph colouring, we know that `_colours[j]`
# will never be equal to `c`.
for j in self._collision_graph[i]:
# The `i < j` check ensures that we handle each collision pair only once.
if i < j:
self._handle_collision(
self._bodies[i], self._bodies[j])
[docs]
def compute_dynamics(self, iteration: int) -> None:
"""
Computes one step of the simulation, including dynamics and collisions.
:param int iteration: Current iteration.
"""
# ======================================================================================
# --- 1. SETUP & LOGGING ---
# ======================================================================================
self._current_iteration = iteration # Store iteration for detailed logging
self._logged_collision_details = False # Reset log flag each frame
self._log_system_energy(iteration)
# Number of bodies in the space.
num_bodies: Final[int] = len(self._bodies)
# ======================================================================================
# --- 2. ADAPTIVE TIME-STEPPING ---
# ======================================================================================
# Calculate a stable time step `dt` for the current frame based on the maximum
# acceleration from the *previous* frame.
# --- Adaptive Time Stepping ---
# The time step for the current frame is calculated based on the state of the system
# at the end of the *previous* frame. This makes the time step responsive to the
# simulation's dynamics, ensuring stability during high-acceleration events (like
# close encounters or collisions) while allowing for efficiency when bodies are far apart.
max_a_sq = 0.0
if num_bodies > 0:
# Find the maximum squared acceleration among all bodies from the previous step.
max_a_sq = max(np.dot(body.a, body.a) for body in self._bodies)
target_dt: float
if max_a_sq > self.__epsilon**2:
# Heuristic for a stable time step: dt should be small enough that a body with the
# maximum acceleration does not travel more than a small target distance (TARGET_DX).
# The formula is derived from d = 0.5*a*t^2 => t = sqrt(2*d/a). We use a simplified
# version: dt = TARGET_DX / sqrt(max_a_sq). The time step is capped by MAX_DT.
target_dt = min(constants.MAX_DT,
constants.TARGET_DX / math.sqrt(max_a_sq))
else:
# If accelerations are negligible, use the maximum allowed time step.
target_dt = constants.MAX_DT
# Smooth the change in dt using an exponential moving average (a low-pass filter).
# This prevents rapid, large oscillations in the time step, which could introduce
# its own numerical instability. We move a fraction of the way towards the target dt.
damping_factor = 0.1
self.__dt = self.__dt * (1 - damping_factor) + \
target_dt * damping_factor
# ======================================================================================
# --- 3. VELOCITY VERLET INTEGRATION ---
# ======================================================================================
# The Velocity Verlet algorithm is split into two parts around the force calculation.
# This structure provides better numerical stability than a simple Euler integration.
start_time = time.perf_counter()
# --- Part 1: Update positions and half-step velocities ---
# To improve energy conservation with an adaptive timestep, we use a symmetric approach:
# - The position update uses the time step from the previous frame (`dt_pos`).
# - The velocity half-step uses the time step for the current frame (`dt`).
self._bodies.integrate_part1(self.__dt, self.__previous_dt)
self._prof_integration += time.perf_counter() - start_time
# ======================================================================================
# --- 4. FORCE CALCULATION & COLLISION HANDLING ---
# ======================================================================================
# Now that bodies are in their new positions, we calculate all forces and handle
# interactions.
start_time = time.perf_counter()
# Find all pairs of colliding bodies.
self._build_collision_graph(num_bodies)
self._prof_collision_detection += time.perf_counter() - start_time
start_time = time.perf_counter()
# --- Collision Response ---
# Resolve the detected collisions by applying impulses and positional corrections.
self._resolve_all_collisions(num_bodies)
self._prof_collision_response += time.perf_counter() - start_time
start_time = time.perf_counter()
# Reset accelerations
self._bodies.a.fill(0)
self._bodies.alpha.fill(0)
# --- Gravitational Forces ---
# Calculate the gravitational acceleration on each body.
self._compute_gravity(num_bodies)
self._prof_force_calculation += time.perf_counter() - start_time
# --- Part 2: Complete the velocity integration for the CURRENT step ---
self._bodies.integrate_part2(self.__dt)
# ======================================================================================
# --- 5. CLEANUP ---
# ======================================================================================
# Store the dt we just used for this frame's integration for the energy log.
self.__previous_dt = self.__dt
@property
def bodies(self) -> list[BodyProxy]:
"""
Provides an AoS-like list of body proxies for the display controller.
:returns: List of body proxies.
:rtype: list[BodyProxy]
"""
return [self._bodies[i] for i in range(len(self._bodies))]