"""Define a factory to easily create |A|."""
import logging
from pathlib import Path
from typing import Any, Sequence
from warnings import warn
from lightwin.beam_calculation.beam_calculator import BeamCalculator
from lightwin.core.accelerator.accelerator import (
ACCELERATOR_STATUS_T,
Accelerator,
)
from lightwin.core.elements.field_maps.field_map import FieldMap
from lightwin.failures.strategy import determine_cavities
from lightwin.util.pickling import MyCloudPickler, MyPickler
from lightwin.util.typing import BeamKwargs
[docs]
class AcceleratorFactory:
"""A class to create accelerators."""
[docs]
def __init__(
self,
beam_calculators: BeamCalculator | Sequence[BeamCalculator | None],
files: dict[str, Any],
beam: BeamKwargs,
**kwargs,
) -> None:
"""Facilitate creation of |A| objects.
Parameters
----------
beam_calculators :
Objects that will compute propagation of the beam.
files :
Configuration entries for the input/output paths.
beam :
Configuration dictionary holding the initial beam parameters.
kwargs :
Other configuration dictionaries.
"""
self.dat_file = files["dat_file"]
self.project_folder = files["project_folder"]
#: Parsed dictionary holding path to the different pickles. Typical
#: structure:
#:
#: .. code-block::
#:
#: {
#: "Reference": "reference.pkl",
#: "scenarios": {
#: # Scenario 1: pre-computed solution (skips optimization)
#: 1: {"Solution": "solution-000001.pkl"},
#: # Scenario 2: alternatives with custom names
#: # (optimization still runs, the pickled Accelerators
#: # will be appended)
#: 2: {
#: "Solution": None,
#: "Conservative approach": "design-conservative.pkl",
#: "Aggressive tuning": "design-aggressive.pkl",
#: },
#: # Scenario 3: solution + alternatives
#: 3: {
#: "Solution": "solution-000003.pkl",
#: "Tweaked design": "tweaked.pkl",
#: "Experimental config": "experimental.pkl",
#: }
#: }
#: }
#:
self._pickle_paths = self._parse_pickle_config(
files.get("pickle_paths", {})
)
if isinstance(beam_calculators, BeamCalculator):
beam_calculators = (beam_calculators,)
self.beam_calculators = beam_calculators
main_beam_calculator = beam_calculators[0]
if main_beam_calculator is None:
raise ValueError("Need at least one working BeamCalculator.")
self.main_beam_calculator = main_beam_calculator
self._elts_factory = main_beam_calculator.list_of_elements_factory
self._beam = beam
self._pickler: MyPickler | None = None
@property
def pickler(self) -> MyPickler:
if self._pickler is None:
self._pickler = MyCloudPickler()
return self._pickler
[docs]
def create_all(
self, wtf: dict[str, Any] | None = None
) -> tuple[dict[int, list[Accelerator]], dict[str, Any] | None]:
"""Create reference and broken accelerators.
Also loads any additional pre-computed accelerators from pickle files
specified in the configuration.
Parameters
----------
wtf :
"What To Fail/Fit" configuration. Can contain automatic fault
generation parameters that will be resolved into specific cavity
failures.
Returns
-------
accelerators :
Dictionary where keys are |FS| indexes, and values are lists of
corresponding |A|. First index corresponds to reference accelerator
(no failure).
updated_wtf :
The resolved ``wtf`` configuration with explicit cavity failures.
None if no wtf was provided.
"""
reference = self.create_reference()
accelerators = {0: [reference]}
updated_wtf = None
if wtf is not None:
n_scenarios, updated_wtf = determine_cavities(reference.elts, wtf)
accelerators.update(self.create_all_broken(n_scenarios))
additional = self._load_additional_pickles(
reserved_names={"Reference", "Solution"}
)
if len(additional) > 0:
logging.warning(
"Behavior of additional Accelerator is not well defined. In "
"particular if there are several FaultScenario."
)
for index in accelerators:
to_add = additional.get(index)
if to_add is None:
continue
accelerators[index].extend(to_add)
return accelerators, updated_wtf
[docs]
def create_reference(self) -> Accelerator:
"""Unpickle or create from scratch the reference (nominal) accelerator.
Returns
-------
The nominal accelerator without failures.
"""
return self._create_one_accelerator(
name="Reference",
status="reference",
index=0,
output_path=self.project_folder / "000000_ref",
)
[docs]
def create_all_broken(
self, n_scenarios: int
) -> dict[int, list[Accelerator]]:
"""Unpickle or create from scratch several broken accelerators.
Parameters
----------
n_scenarios :
Number of broken accelerators to create. This is also the number
of |FS| we will create.
Returns
-------
Dict associating |FS| index to corresponding broken |A|.
"""
return {
i: [
self._create_one_accelerator(
name="Solution",
status="broken",
index=i,
output_path=self.project_folder / f"{i:06d}",
)
]
for i in range(1, n_scenarios + 1)
}
[docs]
def _create_one_accelerator(
self,
name: str,
status: ACCELERATOR_STATUS_T,
index: int,
output_path: Path,
) -> Accelerator:
"""Create or load a single accelerator.
Parameters
----------
name :
Accelerator name (e.g., ``"Reference"``, ``"Solution"``).
status :
Current status design. Ignored if the |A| is unpickled.
index :
Corresponding |FS| index. A null index is reserved for reference
accelerator.
output_path :
Path where accelerator data will be stored.
Returns
-------
Loaded from pickle if available, otherwise freshly created.
"""
pickle_path = self._get_pickle_path(name, index)
if pickle_path is not None:
accelerator = self._load_from_pickle(
name, index=index, pickle_path=pickle_path
)
if accelerator is not None:
logging.info(
f"Created {accelerator.id} Accelerator by unpickling "
f"'{pickle_path}'."
)
return accelerator
accelerator = self._build_accelerator(
name,
status,
index=index,
output_path=output_path,
pickle_path=pickle_path,
)
info = f"Created {accelerator.id} Accelerator"
if pickle_path:
info += f" (will be pickled to '{pickle_path}')"
logging.info(info + ".")
return accelerator
# =========================================================================
# Build Accelerator from scratch
# =========================================================================
[docs]
def _build_accelerator(
self,
name: str,
status: ACCELERATOR_STATUS_T,
index: int,
output_path: Path,
pickle_path: Path | None,
) -> Accelerator:
"""Build a new accelerator from scratch.
Parameters
----------
name :
Accelerator name.
status :
Current status design.
index :
Corresponding |FS| index. A null index is reserved for reference
accelerator.
output_path :
Path where accelerator data will be stored.
pickle_path :
Optional path where accelerator will be pickled after creation.
Returns
-------
Newly created accelerator instance.
"""
self._create_output_directories(output_path)
accelerator = Accelerator(
name=name,
status=status,
index=index,
dat_file=self.dat_file,
accelerator_path=output_path,
list_of_elements_factory=self._elts_factory,
pickle_path=pickle_path,
**self._beam,
)
self._check_consistency_reference_phase_policies(accelerator.l_cav)
return accelerator
[docs]
def _create_output_directories(self, output_path: Path) -> None:
"""Create output directory structure for an accelerator.
Creates the main accelerator directory and subdirectories for each
beam calculator.
The default structure will look like::
YYYY.MM.DD_HHhmm_SSs_MILLIms/
├── 000000_ref
│ ├── 0_Envelope1D/
│ └── 1_TraceWin/
├── 000001
│ ├── 0_Envelope1D/
│ └── 1_TraceWin/
├── 000002
│ ├── 0_Envelope1D/
│ └── 1_TraceWin/
├── 000003
│ ├── 0_Envelope1D/
│ └── 1_TraceWin/
└── lightwin.log
- The main ``YYYY.MM.DD_HHhMM_SSs_MILLIms/`` directory is created at
the same location as the original ``DAT`` file. You can override its
name with the ``project_folder`` key in the ``[files]`` ``TOML``
section.
- In every ``accelerator_path`` (eg ``000002/``), you will find one
directory per |BC|. In this example, compensation settings were found
with :class:`.Envelope1D` and a second simulation was made with
:class:`.TraceWin`.
"""
output_path.mkdir(parents=True, exist_ok=True)
for beam_calculator in self.beam_calculators:
if beam_calculator is None:
continue
beam_calculator_dir = output_path / beam_calculator.id
beam_calculator_dir.mkdir(parents=True, exist_ok=True)
[docs]
def _check_consistency_reference_phase_policies(
self, cavities: Sequence[FieldMap]
) -> None:
"""Check that solvers phases are consistent with ``DAT`` file.
Parameters
----------
cavities :
Sequence of cavity field maps to check.
"""
if len(cavities) == 0:
return
beam_calculators = [x for x in self.beam_calculators if x is not None]
policies = {
beam_calculator: beam_calculator.reference_phase_policy
for beam_calculator in beam_calculators
}
n_unique = len(set(policies.values()))
if n_unique > 1:
logging.warning(
"The different BeamCalculator objects have different "
"reference phase policies. This may lead to inconsistencies "
f"when cavities fail.\n{policies = }"
)
return
references = {x.cavity_settings.reference for x in cavities}
if len(references) > 1:
logging.info(
"The cavities do not all have the same reference phase."
)
# =========================================================================
# Related to pickling/unpickling Accelerators
# =========================================================================
[docs]
def _parse_pickle_config(
self, pickle_config: dict[str, str | dict[str, str]]
) -> dict[int, str | dict[str, str]]:
"""Parse pickle paths configuration from ``TOML``.
Note
----
When a Reference/Solution ``PKL`` is provided but does not exist,
the associated |A| will be pickled at the end of the simulation.
Parameters
----------
pickle_config :
Configuration ``TOML`` ``[files.pickle_paths]`` sub-dictionary. The
expected structure is:
.. code-block:: toml
[files.pickle_paths]
Reference = "reference.pkl"
# Scenario 1: pre-computed solution (skips optimization)
[files.pickle_paths.000001]
Solution = "solution-000001.pkl"
# Scenario 2: alternatives with custom names (optimization
# still runs, the pickled Accelerators will be appended)
[files.pickle_paths.000002]
"Conservative approach" = "design-conservative.pkl"
"Aggressive tuning" = "design-aggressive.pkl"
# Scenario 3: solution + alternatives
[files.pickle_paths.000003]
Solution = "solution-000003.pkl"
"Tweaked design" = "tweaked.pkl"
"Experimental config" = "experimental.pkl"
Returns
-------
Parsed configuration with structure:
.. code-block:: python
{
0: "reference.pkl",
# Scenario 1: pre-computed solution (skips
# optimization)
1: {"Solution": "solution-000001.pkl"},
# Scenario 2: alternatives with custom names
# (optimization still runs, the pickled Accelerators
# will be appended)
2: {
"Conservative approach": "design-conservative.pkl",
"Aggressive tuning": "design-aggressive.pkl",
},
# Scenario 3: solution + alternatives
3: {
"Solution": "solution-000003.pkl",
"Tweaked design": "tweaked.pkl",
"Experimental config": "experimental.pkl",
},
}
- 0: Path to reference accelerator pickle, or None.
- ``scenarios[index]``: Sub-dictionary where keys are
:attr:`.Accelerator.name`, values are corresponding ``PKL``
|A| pickle files.
"""
parsed: dict[int, str | dict[str, str]] = {}
ref = pickle_config.pop("Reference", None)
if not isinstance(ref, (str, Path)) and ref is not None:
logging.error(
f"[files.pickle_paths] 'Reference' value is {ref}, but a "
"string is expected."
)
ref = None
if ref is not None:
parsed = {0: ref}
for scenario_key, scenario_data in pickle_config.items():
try:
index = int(scenario_key)
except (ValueError, TypeError):
logging.error(
f"Invalid scenario '{scenario_key = }' in pickle_paths. "
"Expected format: '000001', '000002', etc."
)
continue
if isinstance(scenario_data, str):
logging.error(
f"The key '{scenario_data}' in [files.pickle_paths."
f"scenarios.{scenario_key} was associated to the "
f"string '{scenario_data}', but only 'Reference' can "
"be associated to a string. To provide solution "
"tunings, use the dictionaries."
)
continue
parsed[index] = scenario_data
return parsed
[docs]
def _get_pickle_path(self, name: str, index: int = 0) -> Path | None:
"""Get pickle path for a named accelerator in :attr:`_pickle_paths`.
Parameters
----------
name :
Accelerator name to look up in pickle paths configuration.
index :
|FS| index. If not null, we look for ``name`` key in
``self._pickle_paths[index]`` subdict.
Returns
-------
Resolved absolute path if configured, None otherwise.
"""
if name == "Reference":
path = self._pickle_paths.get(0)
if path is None:
return
if isinstance(path, Path):
return path.resolve().absolute()
if isinstance(path, str):
return Path(path).resolve().absolute()
raise TypeError(
f"Reference Accelerator pickle {path = } could not be resolved"
"to a string nor a Path."
)
scenario = self._pickle_paths.get(index)
if scenario is None:
return
if isinstance(scenario, str):
raise ValueError(
f"The value associated to fault scenario #{index} in "
f"the AcceleratorFactory._pickle_paths attribute is {scenario}"
" but should be a 'dict[str, str]'."
)
path = scenario.get(name)
if path is None:
return
return Path(path).resolve().absolute()
[docs]
def _load_from_pickle(
self, name: str, index: int, pickle_path: Path
) -> Accelerator | None:
"""Load accelerator from pickle file if it exists.
Parameters
----------
name :
Accelerator name.
index :
Corresponding |FS| index. A null index is reserved for reference
accelerator.
pickle_path :
Path to pickle file.
Returns
-------
Loaded accelerator if file exists, None otherwise.
"""
if not pickle_path.is_file():
return None
logging.info(f"Loading {name} from pickle: {pickle_path}")
return Accelerator.from_pickle(
self.pickler, pickle_path, name=name, index=index
)
[docs]
def _load_additional_pickles(
self, reserved_names: set[str] = {"Reference", "Solution"}
) -> dict[int, list[Accelerator]]:
"""Unpickle additional |A|.
Parameters
----------
reserved_names :
Names already used for Reference/Solution accelerators; associated
paths are not unpickled.
Returns
-------
Additional accelerators loaded from pickle files, associated with
their |FS| index.
"""
additional: dict[int, list[Accelerator]] = {}
for index, names_paths in self._pickle_paths.items():
if index == 0 or isinstance(names_paths, str):
continue
accelerators = []
for pickle_name, raw_path in names_paths.items():
if pickle_name in reserved_names or raw_path is None:
continue
pickle_path = Path(raw_path).resolve().absolute()
accelerator = self._load_from_pickle(
pickle_name, index=index, pickle_path=pickle_path
)
if accelerator is None:
logging.debug(
f"Not unpickling '{pickle_name}' key in [files."
f"pickle_paths.{index}] because"
f" '{pickle_path}' does not exist."
)
continue
logging.info(
f"Loading additional accelerator '{accelerator.id}' from pickle."
)
accelerators.append(accelerator)
additional[index] = accelerators
return additional
# =========================================================================
# Deprecated kept for backward compatibility.
# =========================================================================
[docs]
def create_nominal(self) -> Accelerator:
"""Create the nominal linac.
.. deprecated:: 0.15.1
Prefer :meth:`.create_reference`.
"""
warn(
"The method create_nominal is deprecated. Prefer using create_reference.",
DeprecationWarning,
stacklevel=2,
)
return self.create_reference()
[docs]
def create_failed(self, n_objects: int) -> list[Accelerator]:
"""Create failed linac(s).
.. deprecated:: 0.15.1
Prefer :meth:`.create_all_broken`.
"""
warn(
"The method create_failed is deprecated. Prefer using create_all_broken.",
DeprecationWarning,
stacklevel=2,
)
accelerators = [
accelerator
for sorted_by_fault_scenario in self.create_all_broken(
n_objects
).values()
for accelerator in sorted_by_fault_scenario
]
return accelerators
# =========================================================================
# Deprecated kept for backward compatibility.
# =========================================================================
[docs]
class NoFault(AcceleratorFactory):
"""Create single accelerator without failure.
.. deprecated:: 0.15.0
Prefer :class:`AcceleratorFactory`.
"""
[docs]
def __init__(self, *args, **kwargs) -> None:
warn(
"The class NoFault is deprecated. Prefer using AcceleratorFactory.",
DeprecationWarning,
stacklevel=2,
)
return super().__init__(*args, **kwargs)
[docs]
def run(self, *args, **kwargs) -> Accelerator:
return self.create_reference()
[docs]
class WithFaults(AcceleratorFactory):
"""Create accelerators with failures.
.. deprecated:: 0.15.0
Prefer :class:`AcceleratorFactory`.
"""
[docs]
def __init__(self, *args, wtf: dict[str, Any], **kwargs) -> None:
warn(
"The class WithFaults is deprecated. Prefer using AcceleratorFactory.",
DeprecationWarning,
stacklevel=2,
)
self._wtf = wtf
return super().__init__(*args, **kwargs)
[docs]
def run_all(self, *args, **kwargs) -> list[Accelerator]:
reference = self.create_reference()
n_objects = len(self._wtf["failed"])
return [reference] + self.create_failed(n_objects)