Source code for lightwin.beam_calculation.tracewin.tracewin

"""Define a :class:`.BeamCalculator` that will call TraceWin from cmd line.

It inherits from :class:`.BeamCalculator` base class.  It solves the motion of
the particles in envelope or multipart, in 3D. In contrary to
:class:`.Envelope1D` solver, it is not a real solver but an interface with
``TraceWin`` which must be installed on your machine.

"""

import logging
import shutil
import subprocess
from collections.abc import Sequence
from pathlib import Path
from typing import Any

from lightwin.beam_calculation.beam_calculator import BeamCalculator
from lightwin.beam_calculation.simulation_output.simulation_output import (
    SimulationOutput,
)
from lightwin.beam_calculation.tracewin.element_tracewin_parameters_factory import (
    ElementTraceWinParametersFactory,
)
from lightwin.beam_calculation.tracewin.simulation_output_factory import (
    SimulationOutputFactoryTraceWin,
)
from lightwin.core.accelerator.accelerator import Accelerator
from lightwin.core.elements.field_maps.field_map import FieldMap
from lightwin.core.list_of_elements.list_of_elements import ListOfElements
from lightwin.failures.set_of_cavity_settings import SetOfCavitySettings
from lightwin.tracewin_utils.interface import (
    beam_calculator_to_command,
    failed_cavities_to_command,
    set_of_cavity_settings_to_command,
)
from lightwin.util.typing import REFERENCE_PHASE_POLICY_T, BeamKwargs


[docs] class TraceWin(BeamCalculator): """Hold a TraceWin beam calculator. Parameters ---------- executable : Path to the TraceWin executable. ini_path : Path to the ``INI`` TraceWin file. base_kwargs : TraceWin optional arguments. Override what is defined in ``INI``, but overriden by arguments from :class:`.ListOfElements` and :class:`.SimulationOutput`. _tracewin_command : Attribute to hold the value of the base command to call TraceWin. out_folder : Name of the results folder (not a complete path, just a folder name). path_cal : Name of the results folder. Updated at every call of the :func:`init_solver_parameters` method, using ``Accelerator.accelerator_path`` and ``self.out_folder`` attributes. dat_file : Base name for the ``DAT`` file. ?? """
[docs] def __init__( self, executable: Path, ini_path: Path, base_kwargs: dict[str, str | int | float | bool | None], out_folder: Path | str, default_field_map_folder: Path | str, beam_kwargs: BeamKwargs, reference_phase_policy: REFERENCE_PHASE_POLICY_T = "phi_0_rel", cal_file: Path | None = None, **kwargs: Any, ) -> None: """Define some other useful methods, init variables.""" self.executable = executable self.ini_path = ini_path.resolve().absolute() self.base_kwargs = base_kwargs self.cal_file = cal_file self._beam_kwargs = beam_kwargs filename = Path("tracewin.out") if self.is_a_multiparticle_simulation: filename = Path("partran1.out") self._filename = filename super().__init__( reference_phase_policy=reference_phase_policy, out_folder=out_folder, default_field_map_folder=default_field_map_folder, beam_kwargs=beam_kwargs, **kwargs, ) self.path_cal: Path self.dat_file: Path self._tracewin_command: list[str] | None = None
[docs] def _set_up_specific_factories(self) -> None: """Set up the factories specific to the :class:`.BeamCalculator`. This method is called in the :meth:`.BeamCalculator.__init__`, hence it appears only in the base :class:`.BeamCalculator`. """ self.beam_calc_parameters_factory = ElementTraceWinParametersFactory() self.simulation_output_factory = SimulationOutputFactoryTraceWin( _is_3d=self.is_a_3d_simulation, _is_multipart=self.is_a_multiparticle_simulation, _solver_id=self.id, _beam_kwargs=self._beam_kwargs, out_folder=self.out_folder, _filename=self._filename, beam_calc_parameters_factory=self.beam_calc_parameters_factory, )
[docs] def _tracewin_base_command( self, accelerator_path: Path, **kwargs ) -> tuple[list[str], Path]: """Define the 'base' command for TraceWin. This part of the command is the same for every :class:`.ListOfElements` and every :class:`.Fault`. It sets the TraceWin executable, the ``INI`` file. It also defines ``base_kwargs``, which should be the same for every calculation. Finally, it sets ``path_cal``. But this path is more :class:`.ListOfElements` dependent... ``Accelerator.accelerator_path`` + ``out_folder`` (+ ``fault_optimisation_tmp_folder``) """ kwargs = kwargs.copy() for key, val in self.base_kwargs.items(): if key not in kwargs: kwargs[key] = val path_cal = accelerator_path / self.out_folder if not path_cal.is_dir(): path_cal.mkdir() _tracewin_command = beam_calculator_to_command( self.executable, self.ini_path, path_cal, **kwargs, ) return _tracewin_command, path_cal
[docs] def _tracewin_full_command( self, elts: ListOfElements, set_of_cavity_settings: SetOfCavitySettings | None, **kwargs, ) -> tuple[list[str], Path]: """Set the full TraceWin command. It contains the 'base' command, which includes every argument that is common to every calculation with this :class:`.BeamCalculator`: path to ``INI`` file, to executable... It contains the :class:`.ListOfElements` command: path to the ``DAT`` file, initial energy and beam properties. It can contain some :class:`.SetOfCavitySettings` commands: ``ele`` arguments to modify some cavities tuning. """ accelerator_path = elts.files_info["accelerator_path"] command, path_cal = self._tracewin_base_command( accelerator_path, **kwargs ) command.extend(elts.tracewin_command) if set_of_cavity_settings is None: return command, path_cal command.extend( set_of_cavity_settings_to_command( set_of_cavity_settings, phi_bunch_first_element=elts.input_particle.phi_abs, idx_first_element=elts[0].idx["elt_idx"], ) ) command.extend( failed_cavities_to_command( elts.l_cav, idx_first_element=elts[0].idx["elt_idx"], ) ) return command, path_cal
# TODO what is specific_kwargs for? I should just have a function # set_of_cavity_settings_to_kwargs
[docs] def run( self, elts: ListOfElements, update_reference_phase: bool = False, **specific_kwargs, ) -> SimulationOutput: """Run TraceWin. Parameters ---------- elts : List of elements in which the beam must be propagated. update_reference_phase : To change the reference phase of cavities when it is different from the one asked in the ``TOML``. To use after the first calculation, if :attr:`.BeamCalculator.reference_phase_policy` does not align with :attr:`.CavitySettings.reference`. specific_kwargs : ``TraceWin`` optional arguments. Overrides what is defined in ``base_kwargs`` and ``INI``. Returns ------- Holds energy, phase, transfer matrices (among others) packed into a single object. """ return super().run(elts, update_reference_phase, **specific_kwargs)
[docs] def run_with_this( self, set_of_cavity_settings: SetOfCavitySettings | None, elts: ListOfElements, use_a_copy_for_nominal_settings: bool = True, **specific_kwargs, ) -> SimulationOutput: """Perform a simulation with new cavity settings. Calling it with ``set_of_cavity_settings = None`` is the same as calling the plain :func:`run` method. Parameters ---------- set_of_cavity_settings : The new cavity settings to try. If it is None, then the cavity settings are taken from the FieldMap objects. elts : List of elements in which the beam should be propagated. use_a_copy_for_nominal_settings : To copy the nominal :class:`.CavitySettings` and avoid altering their nominal counterpart. Set it to True during optimisation, to False when you want to keep the current settings. The default is True. Returns ------- Holds energy, phase, transfer matrices (among others) packed into a single object. """ if specific_kwargs not in (None, {}): logging.critical(f"{specific_kwargs = }: deprecated.") if specific_kwargs is None: specific_kwargs = {} set_of_cavity_settings = SetOfCavitySettings.from_incomplete_set( set_of_cavity_settings, elts.l_cav, use_a_copy_for_nominal_settings=use_a_copy_for_nominal_settings, ) command, path_cal = self._tracewin_full_command( elts, set_of_cavity_settings, **specific_kwargs ) is_a_fit = use_a_copy_for_nominal_settings exception = _run_in_bash(command, output_command=not is_a_fit) # check in which order those two methods should be called simulation_output = self._generate_simulation_output( elts, path_cal, exception, set_of_cavity_settings=set_of_cavity_settings, ) self._post_treat_cavity_setttings( set_of_cavity_settings, elts.l_cav, simulation_output ) return simulation_output
[docs] def post_optimisation_run_with_this( self, optimized_cavity_settings: SetOfCavitySettings, full_elts: ListOfElements, **specific_kwargs, ) -> SimulationOutput: """Run TraceWin with optimized cavity settings. After the optimisation, we want to re-run TraceWin with the new settings. However, we need to tell it that the linac is bigger than during the optimisation. Concretely, it means: * Rephasing the cavities in the compensation zone. * Updating the ``index`` ``n`` of the cavities in the ``ele[n][v]`` command. Note that at this point, the ``DAT`` has not been updated yet. Parameters ---------- optimized_cavity_settings : Optimized parameters. full_elts : Contains the full linac. Returns ------- Necessary information on the run. """ optimized_cavity_settings.re_set_elements_index_to_absolute_value() # patch: to have the new settings saved in the .dat, we incorporate # new cavity settings now # for cavity in full_elts.l_cav: # if cavity not in optimized_cavity_settings: # continue # cavity.cavity_settings = optimized_cavity_settings[cavity] # full_elts.store_settings_in_dat( # full_elts.files_info["dat_file"], which_phase=self.reference_phase # ) simulation_output = self.run_with_this( optimized_cavity_settings, full_elts, **specific_kwargs ) return simulation_output
[docs] def init_solver_parameters(self, accelerator: Accelerator) -> None: """Set the ``path_cal`` variable. We also set the ``_tracewin_command`` attribute to None, as it must be updated when ``path_cal`` changes. .. note:: In contrary to :class:`.Envelope1D` and :class:`.Envelope3D`, this routine does not set parameters for the :class:`.BeamCalculator`. As a matter of a fact, TraceWin is a standalone code and does not need out solver parameters. However, if we want to save the meshing used by TraceWin, we will have to use the :class:`.ElementTraceWinParametersFactory` later. """ self.path_cal = Path( accelerator.get("accelerator_path"), self.out_folder ) if not self.path_cal.is_dir(): self.path_cal.mkdir() self._tracewin_command = None if self.cal_file is None: return assert self.cal_file.is_file() shutil.copy(self.cal_file, self.path_cal) logging.debug(f"Copied {self.cal_file = } in {self.path_cal = }.")
@property def is_a_multiparticle_simulation(self) -> bool: """Tell if you should buy Bitcoins now or wait a few months.""" if "partran" in self.base_kwargs: return self.base_kwargs["partran"] == 1 return Path(self.path_cal, "partran1.out").is_file() @property def is_a_3d_simulation(self) -> bool: """Tell if the simulation is in 3D.""" return True
[docs] def _post_treat_cavity_setttings( self, set_of_cavity_settings: SetOfCavitySettings | None, cavities: Sequence[FieldMap], simulation_output: SimulationOutput, ) -> None: """Store cavity settings in the appropriate :class:`.CavitySettings`. .. note:: When we are under a fitting process, *i.e.* when ``set_of_cavity_settings`` is not ``None``, we update the :class:`.CavitySettings` in the ``set_of_cavity_settings``, not the ones in :attr:`.FieldMap.cavity_settings`. """ for cavity in cavities: phi_abs, v_cav_mv, phi_s = simulation_output.get( "phi_abs", "v_cav_mv", "phi_s", elt=cavity, pos="in", to_deg=False, to_numpy=False, ) if set_of_cavity_settings is None: # Any cavity during a "normal" run settings = cavity.cavity_settings elif cavity in set_of_cavity_settings: # Compensating cavity during a fit settings = set_of_cavity_settings[cavity] else: # Non-compensating cavity during a fit continue settings.phi_bunch = phi_abs settings.phi_s = phi_s settings.v_cav_mv = v_cav_mv return
# ============================================================================= # Bash # =============================================================================
[docs] def _run_in_bash( command: list[str], output_command: bool = True, output_error: bool = False ) -> bool: """Run given command in bash.""" output = "\n\t".join(command) if output_command: logging.info(f"Running command:\n\t{output}") process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = process.communicate() exception = process.wait() # exception = False # for line in process.stdout: # if output_error: # print(line) # exception = True if exception != 0 and output_error: logging.warning( "A message was returned when executing following " f"command:\n\t{stderr}" ) return exception != 0