#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Provides the `PhysicsWorker` class, which runs the core physics simulation in
a separate thread to keep the UI responsive.
:module: physics_worker
:author: Le Bars, Yoann
"""
from PySide6.QtCore import QObject, Signal, Slot
import numpy as np
from src.space import Space
from src.body import BodyProxy
from src.args import CmdLineArgs
from src.config import SimulationConfig
[docs]
class PhysicsWorker(QObject):
"""
Worker object to run the physics simulation in a separate thread.
Emits signals with updated body data after each simulation step.
:ivar Space _space: The physics simulation instance.
:ivar int _iteration_count: The current simulation iteration.
:ivar int _max_iterations: The total number of iterations to run.
"""
# Signal to emit updated body data. Using raw NumPy arrays for data transfer
# is a key optimisation, as it avoids the overhead of creating and sending
# thousands of individual Python objects every frame.
updated_body_data = Signal(
np.ndarray, np.ndarray, np.ndarray, np.ndarray, int)
simulation_finished = Signal()
def __init__(self, args: CmdLineArgs, parent: QObject | None = None):
"""
Initialises the physics worker.
:param CmdLineArgs args: The parsed command-line arguments.
:param QObject | None parent: The parent QObject, if any.
"""
super().__init__(parent)
config = SimulationConfig.from_args(args)
self._space = Space(config, args.dt)
self._iteration_count = 0
self._max_iterations = config.n_iter
@Slot()
def start_simulation(self):
"""Starts the simulation loop in the worker thread."""
self._iteration_count = 0
# The first step is triggered to kick off the simulation. Subsequent steps
# will be requested by the Display's timer.
self.perform_single_step()
@Slot()
def perform_single_step(self):
"""Performs a single step of the physics simulation."""
if self._iteration_count < self._max_iterations:
self._space.compute_dynamics(self._iteration_count)
positions, quaternions, torques, alphas = self._space.get_body_data_for_display()
self.updated_body_data.emit(
positions, quaternions, torques, alphas, self._iteration_count)
self._iteration_count += 1
else:
self.simulation_finished.emit()
[docs]
def cleanup(self):
"""Cleans up resources held by the physics worker."""
# This method is called at the end of the application's lifecycle to ensure
# that the multiprocessing pool is properly shut down and to print the
# final performance profiling report from the simulation space.
self._space.cleanup()
self._space.print_profiling_report()
@property
def initial_bodies(self) -> list[BodyProxy]:
"""
Provides read-only access to the initial list of body proxies.
This allows the UI thread to create visual representations of the bodies
before the simulation's event loop begins.
"""
return self._space.bodies