from aiida.common import AttributeDict, LinkType
from aiida.engine import if_, ToContext, WorkChain
from aiida.manage.caching import disable_caching
from aiida.orm import Bool, CalcJobNode
from aiida.orm.nodes.data.base import to_aiida_type
from aiida.plugins import CalculationFactory
from aiida_crystal17.data.input_params import CryInputParamsData
CryDossCalculation = CalculationFactory('crystal17.doss')
CryCalculation = CalculationFactory('crystal17.main')
[docs]class CryPropertiesWorkChain(WorkChain):
"""a WorkChain to compute properties of a structure, using CRYSTAL
A RemoteData node should be supplied that was created by a previous `CryMainCalculation`.
If the remote folder no longer exists or does not contain the wavefunction file
(usually fort.9 or designated by `inputs.doss.metadata.options.input_wf_name`),
the previous calculations inputs/outputs will be used to run an SCF calculation first.
Currently this work chain is only set up to run a DOSS calculation,
but in the future it would be intended to run a selection of property calculations.
"""
_doss_namespace = 'doss'
_calc_namespace = 'cry'
[docs] @classmethod
def define(cls, spec):
# yapf: disable
# pylint: disable=bad-continuation
super(CryPropertiesWorkChain, cls).define(spec)
spec.expose_inputs(CryDossCalculation, include=['wf_folder'])
spec.expose_inputs(CryDossCalculation, namespace=cls._doss_namespace, exclude=('wf_folder',))
spec.input('{}.metadata.options.resources'.format(cls._doss_namespace), valid_type=dict, required=False)
spec.input('{}.meta_options'.format(cls._calc_namespace),
valid_type=dict, required=False, non_db=True,
help='if supplied will update the original CryMainCalculations `metadata.options')
spec.input('clean_workdir', valid_type=Bool,
serializer=to_aiida_type, required=False,
help='If `True`, work directories of all called calculation will be cleaned at the end of execution.')
spec.input('test_run', valid_type=Bool, required=False, serializer=to_aiida_type,
help='break off the workchain before submitting a calculation')
spec.outline(
if_(cls.check_remote_folder)(
cls.submit_scf_calculation,
cls.check_scf_calculation
),
cls.submit_doss_calculation,
cls.check_doss_calculation
)
spec.expose_outputs(CryDossCalculation, namespace=cls._doss_namespace)
spec.exit_code(
200, 'END_OF_TEST_RUN',
message=('Workchain ended before submitting calculation'))
spec.exit_code(
201, 'ERROR_NO_INCOMING_CALC',
message=('The wf_folder does not contain a wavefunction file, '
'and was not created by a CryMainCalculation.'))
spec.exit_code(
202, 'ERROR_FAILED_INCOMING_CALC',
message=('The CryMainCalculation that created the wf_folder failed.'))
spec.exit_code(
203, 'ERROR_INCOMPLETE_INCOMING_CALC',
message=('The CryMainCalculation that created the wf_folder can not be used to restart a calculation.'))
spec.exit_code(
204, 'ERROR_SCF_CALC_FAILED',
message=('The SCF calculation failed.'))
spec.exit_code(
205, 'ERROR_DOSS_CALC_FAILED',
message=('The DOSS calculation failed'))
[docs] def check_remote_folder(self):
""" check that the remote folder contains the wavefunction file
"""
self.ctx.wf_folder = self.inputs.wf_folder
wf_filename = self.inputs.doss.metadata.options.input_wf_name
# TODO this should use the exponential backoff mechanism
if not self.inputs.wf_folder.is_empty:
content = self.inputs.wf_folder.listdir()
if wf_filename in content:
self.ctx.run_calc = False
return False
self.report("Remote folder {} does not contain '{}', attempting to rerun SCF calculation".format(
self.inputs.wf_folder, wf_filename))
self.ctx.run_calc = True
# TODO if re-running we should reset self.inputs.doss.metadata.options.input_wf_name
return True
[docs] def submit_scf_calculation(self):
"""Create and submit an SCF calculation,
created from the previous calculations inputs (and output structure if present).
Checks are made that the previous calculation was successful
"""
incoming = self.inputs.wf_folder.get_incoming(
node_class=CryCalculation, link_type=LinkType.CREATE).all_nodes()
if not incoming:
self.report("{} was not created by a CryMainCalculation".format(self.inputs.wf_folder))
return self.exit_codes.ERROR_NO_INCOMING_CALC
previous_calc = incoming[0]
if not previous_calc.is_finished_ok:
self.report("{} did not finish ok: {}".format(previous_calc, previous_calc.exit_status))
return self.exit_codes.ERROR_FAILED_INCOMING_CALC
# create a restart calculation
previous_calc = incoming[0]
builder = incoming[0].get_builder_restart()
# we only want to run a single-point calculation, so can remove any geometry optimisation
try:
params = builder.parameters.get_dict()
params.get("geometry", {}).pop("optimise", None)
except AttributeError:
self.report("{} has no `parameters` intput".format(previous_calc))
return self.exit_codes.ERROR_INCOMPLETE_INCOMING_CALC
self.ctx.calc_params = params
try:
self.ctx.calc_options = builder.metadata.options
except AttributeError:
self.report("{} has no `metadata.options` set".format(previous_calc))
return self.exit_codes.ERROR_INCOMPLETE_INCOMING_CALC
# if new metadata options have been supplied then use them
if "meta_options" in self.inputs.cry:
self.report("replacing metadata of calculation")
self.ctx.calc_options.update(self.inputs.cry["meta_options"])
# use the final structure (output if the previous calculation was an optimization)
if "structure" in previous_calc.outputs:
self.report("using optimised structure")
builder.structure = previous_calc.outputs.structure
# we want to use the final structure, so the input wavefunction will not apply
if "wf_folder" in builder:
builder.pop("wf_folder")
# TODO add a `remove_restarts` function to CryMainCalculation,
# to remove e.g. GUESSP, HESSOPT, RESTART keywords
self.ctx.calc_params.setdefault("scf", {}).pop("GUESSP", None)
builder.parameters = CryInputParamsData(data=params)
builder.metadata.options = self.ctx.calc_options
if 'test_run' in self.inputs and self.inputs.test_run.value:
self.report("`test_run` specified, stopping before submitting scf calculation")
return self.exit_codes.END_OF_TEST_RUN
# TODO could submit CryMainBaseWorkChain
builder.metadata.call_link_label = "scf_calc"
try:
with disable_caching():
calculation = self.submit(builder)
except Exception as err:
self.report("{} submission failed: {}".format(previous_calc, err))
return self.exit_codes.ERROR_INCOMPLETE_INCOMING_CALC
self.report('launching SCF calculation {}'.format(calculation))
return ToContext(calc_scf=calculation)
[docs] def check_scf_calculation(self):
if not self.ctx.calc_scf.is_finished_ok:
self.report("{} failed with exit code: {}".format(self.ctx.calc_scf, self.ctx.calc_scf.exit_status))
return self.exit_codes.ERROR_SCF_CALC_FAILED
self.report("{} finished successfully".format(self.ctx.calc_scf))
self.ctx.wf_folder = self.ctx.calc_scf.outputs.remote_folder
[docs] def submit_doss_calculation(self):
if 'test_run' in self.inputs and self.inputs.test_run.value:
self.report("`test_run` specified, stopping before submitting doss calculation")
return self.exit_codes.END_OF_TEST_RUN
inputs = AttributeDict(self.exposed_inputs(CryDossCalculation, self._doss_namespace))
inputs.wf_folder = self.ctx.wf_folder
inputs['metadata']['call_link_label'] = "doss_calc"
calculation = self.submit(CryDossCalculation, **inputs)
self.report('launching DOSS calculation {}'.format(calculation))
return ToContext(calc_doss=calculation)
[docs] def check_doss_calculation(self):
if not self.ctx.calc_doss.is_finished_ok:
self.report("{} failed with exit code: {}".format(
self.ctx.calc_doss, self.ctx.calc_doss.exit_status))
return self.exit_codes.ERROR_DOSS_CALC_FAILED
self.report("{} finished successfully".format(self.ctx.calc_doss))
namespace_separator = self.spec().namespace_separator
for link_triple in self.ctx.calc_doss.get_outgoing(link_type=LinkType.CREATE).link_triples:
self.out(self._doss_namespace + namespace_separator + link_triple.link_label, link_triple.node)
[docs] def on_terminated(self):
"""Clean the working directories of all child calculations if `clean_workdir=True` in the inputs."""
super(CryPropertiesWorkChain, self).on_terminated()
if "clean_workdir" not in self.inputs or self.inputs.clean_workdir.value is False:
self.report('remote folders will not be cleaned')
return
cleaned_calcs = []
for called_descendant in self.node.called_descendants:
if isinstance(called_descendant, CalcJobNode):
try:
called_descendant.outputs.remote_folder._clean() # pylint: disable=protected-access
cleaned_calcs.append(str(called_descendant.pk))
except (IOError, OSError, KeyError):
pass
if cleaned_calcs:
self.report('cleaned remote folders of calculations: {}'.format(
' '.join(cleaned_calcs)))