Source code for src.physics_worker

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Provides the `PhysicsWorker` class, which runs the core physics simulation in
a separate thread to keep the UI responsive.
:module: physics_worker
:author: Le Bars, Yoann
"""

from PySide6.QtCore import QObject, Signal, Slot
import numpy as np
from src.space import Space
from src.body import BodyProxy
from src.args import CmdLineArgs
from src.config import SimulationConfig


[docs] class PhysicsWorker(QObject): """ Worker object to run the physics simulation in a separate thread. Emits signals with updated body data after each simulation step. :ivar Space _space: The physics simulation instance. :ivar int _iteration_count: The current simulation iteration. :ivar int _max_iterations: The total number of iterations to run. """ # Signal to emit updated body data. Using raw NumPy arrays for data transfer # is a key optimisation, as it avoids the overhead of creating and sending # thousands of individual Python objects every frame. updated_body_data = Signal( np.ndarray, np.ndarray, np.ndarray, np.ndarray, int) simulation_finished = Signal() def __init__(self, args: CmdLineArgs, parent: QObject | None = None): """ Initialises the physics worker. :param CmdLineArgs args: The parsed command-line arguments. :param QObject | None parent: The parent QObject, if any. """ super().__init__(parent) config = SimulationConfig.from_args(args) self._space = Space(config, args.dt) self._iteration_count = 0 self._max_iterations = config.n_iter @Slot() def start_simulation(self): """Starts the simulation loop in the worker thread.""" self._iteration_count = 0 # The first step is triggered to kick off the simulation. Subsequent steps # will be requested by the Display's timer. self.perform_single_step() @Slot() def perform_single_step(self): """Performs a single step of the physics simulation.""" if self._iteration_count < self._max_iterations: self._space.compute_dynamics(self._iteration_count) positions, quaternions, torques, alphas = self._space.get_body_data_for_display() self.updated_body_data.emit( positions, quaternions, torques, alphas, self._iteration_count) self._iteration_count += 1 else: self.simulation_finished.emit()
[docs] def cleanup(self): """Cleans up resources held by the physics worker.""" # This method is called at the end of the application's lifecycle to ensure # that the multiprocessing pool is properly shut down and to print the # final performance profiling report from the simulation space. self._space.cleanup() self._space.print_profiling_report()
@property def initial_bodies(self) -> list[BodyProxy]: """ Provides read-only access to the initial list of body proxies. This allows the UI thread to create visual representations of the bodies before the simulation's event loop begins. """ return self._space.bodies