Source code for aiida_crystal17.workflows.crystal_props.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2019 Chris Sewell
#
# This file is part of aiida-crystal17.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms and conditions
# of version 3 of the GNU Lesser General Public License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
from collections.abc import Mapping
import copy

from aiida import orm
from aiida.common import AttributeDict
from aiida.engine import CalcJobProcessSpec, ToContext, WorkChain, if_
from aiida.manage.caching import disable_caching
from aiida.orm.nodes.data.base import to_aiida_type
from plumpy.ports import PortNamespace

from aiida_crystal17.calculations.cry_main import CryMainCalculation
from aiida_crystal17.calculations.prop_doss import CryDossCalculation
from aiida_crystal17.calculations.prop_ech3 import CryEch3Calculation
from aiida_crystal17.calculations.prop_ppan import CryPpanCalculation
from aiida_crystal17.data.input_params import CryInputParamsData


[docs]def expose_ports(port, port_namespace, exclude): for sub_port_name, sub_port in port_namespace.items(): if isinstance(sub_port, PortNamespace): sub_port_copy = copy.copy(sub_port) sub_port_copy._ports = {} # sub_port_copy.required = False sub_port_copy.populate_defaults = False port[sub_port_name] = sub_port_copy expose_ports(port[sub_port_name], sub_port, exclude) elif exclude is not None and sub_port_name in exclude: pass else: sub_port_copy = copy.deepcopy(sub_port) sub_port_copy.required = False port[sub_port_name] = sub_port_copy
[docs]def expose_optional_inputs(spec, namespace, process_class, exclude=None): spec.input_namespace(namespace, required=False) expose_ports(spec.inputs[namespace], process_class.spec().inputs, exclude)
[docs]def strip_empty_namespaces(data): """Create a dict, with no empty namespaces.""" new_data = {} for name, value in data.items(): if isinstance(value, Mapping): value = strip_empty_namespaces(value) if isinstance(value, Mapping) and len(value) == 0: pass else: new_data[name] = value return new_data
[docs]def get_builder_rerun_scf(calc_node): """Create a populated builder, from a previously run CryMainCalculation.""" if not calc_node.is_finished_ok: raise ValueError("The previous calculation failed") builder = calc_node.get_builder_restart() # we only want to run a single-point calculation, # so can remove any geometry optimisation. params = builder.parameters.get_dict() params.get("geometry", {}).pop("optimise", None) params.setdefault("scf", {}).pop("GUESSP", None) builder.parameters = CryInputParamsData(data=params) # use the final structure (output if the previous calculation was an optimization) if "structure" in calc_node.outputs: builder.structure = calc_node.outputs.structure # we want to use the final structure, so the input wavefunction will not apply if "wf_folder" in builder: builder.pop("wf_folder")
[docs]class CryPropertiesWorkChain(WorkChain): """A WorkChain to compute properties of a structure, using CRYSTAL. Either a pre-computed wavefunction (fort.9) file, or inputs for a CryMainCalculation, should be supplied. Inputs for property calculations can then be added (currently available; doss, ech3). """ _wf_fname = "fort.9" _scf_name = "scf" _scf_class = CryMainCalculation _cry_props = { "doss": CryDossCalculation, "ech3": CryEch3Calculation, "ppan": CryPpanCalculation, } def __init__(self, **kwargs): """Initialize inputs. Here we strip any empty workspaces from the inputs. This is because, as of v1.0.0b6, supplying a builder object to submit/run will include empty namespaces, e.g. ``{'scf': {'metadata': {'options': {}}, 'basissets': {}}}``. This is an issue for non-required namespaces, which have required ports, and will fail validation if these empty namespaces are present. """ # TODO This may have been fixed in fff3cadcc9572bbad32144d7db27da41c2c89c14, # raise an issue on aiida-core? if kwargs.get("inputs", None): kwargs["inputs"] = strip_empty_namespaces(kwargs["inputs"]) super(CryPropertiesWorkChain, self).__init__(**kwargs)
[docs] @classmethod def define(cls, spec: CalcJobProcessSpec): super(CryPropertiesWorkChain, cls).define(spec) # Input requires either a pre-computed wavefunction (fort.9) file, # or inputs for a Crystal Calculation spec.input( "wf_folder", valid_type=(orm.FolderData, orm.RemoteData, orm.SinglefileData), required=False, help="the folder containing the wavefunction fort.9 file", ) # expose_optional_inputs(spec, cls._scf_namespace, CryMainCalculation) spec.expose_inputs( cls._scf_class, namespace=cls._scf_name, namespace_options={"required": False, "populate_defaults": False}, ) spec.expose_outputs( cls._scf_class, namespace=cls._scf_name, namespace_options={"required": False}, ) # available property computations for pname, process_class in cls._cry_props.items(): spec.expose_inputs( process_class, namespace=pname, exclude=["wf_folder"], namespace_options={"required": False, "populate_defaults": False}, ) spec.expose_outputs( process_class, namespace=pname, namespace_options={"required": False} ) # additional input parameters spec.input( "check_remote", valid_type=orm.Bool, serializer=to_aiida_type, required=False, help=( "If a RemoteData wf_folder is input, check it contains the wavefunction file, " "before launching calculations. " "Note, this will fail if the remote computer is not immediately available" ), ) spec.input( "clean_workdir", valid_type=orm.Bool, serializer=to_aiida_type, required=False, help="If `True`, work directories of all called calculation will be cleaned at the end of execution.", ) spec.input( "test_run", valid_type=orm.Bool, required=False, serializer=to_aiida_type, help="break off the workchain before submitting a calculation", ) spec.outline( cls.check_inputs, if_(cls.check_wf_folder)( cls.submit_scf_calculation, cls.check_scf_calculation ), cls.submit_prop_calculations, cls.check_prop_calculations, ) spec.exit_code( 200, "END_OF_TEST_RUN", message=("Workchain ended before submitting calculation."), ) spec.exit_code( 201, "ERROR_NO_WF_INPUT", message=("Neither a wf_folder nor scf calculation was supplied."), ) spec.exit_code( 202, "ERROR_NO_PROP_INPUT", message=("No property calculation inputs were supplied."), ) spec.exit_code( 203, "ERROR_WF_FOLDER", message=("The supplied folder does contain the wavefunction file."), ) spec.exit_code( 210, "ERROR_SCF_SUBMISSION_FAILED", message=("The SCF calculation submission failed."), ) spec.exit_code( 301, "ERROR_SCF_CALC_FAILED", message=("The SCF calculation failed.") ) spec.exit_code( 302, "ERROR_PROP_CALC_FAILED", message=("One or more property calculations failed."), )
[docs] def check_inputs(self): """Check that necessary inputs have been supplied.""" if "wf_folder" not in self.inputs and "scf" not in self.inputs: return self.exit_codes.ERROR_NO_WF_INPUT prop_calc = False for prop_name in self._cry_props: if prop_name in self.inputs: prop_calc = True if not prop_calc: return self.exit_codes.ERROR_NO_PROP_INPUT
[docs] def check_wf_folder(self): """Check whether a wavefunction file has been supplied.""" if "wf_folder" not in self.inputs: self.report("No 'wf_folder' supplied, running SCF Calculation...") self.ctx.wf_folder = None return True # check the supplied folder contains the wavefunction file if isinstance(self.inputs.wf_folder, orm.FolderData): if self._wf_fname not in self.inputs.wf_folder.list_object_names(): return self.exit_codes.ERROR_WF_FOLDER elif isinstance(self.inputs.wf_folder, orm.RemoteData): if "check_remote" in self.inputs and self.inputs.check_remote.value: # TODO this should use the exponential backoff mechanism if self.inputs.wf_folder.is_empty: return self.exit_codes.ERROR_WF_FOLDER if self._wf_fname not in self.inputs.wf_folder.listdir(): return self.exit_codes.ERROR_WF_FOLDER self.report("Using supplied 'wf_folder'") self.ctx.wf_folder = self.inputs.wf_folder return False
[docs] def submit_scf_calculation(self): """Create and submit an SCF calculation.""" if "test_run" in self.inputs and self.inputs.test_run.value: self.report( "`test_run` specified, stopping before submitting scf calculation" ) return self.exit_codes.END_OF_TEST_RUN inputs = AttributeDict(self.exposed_inputs(self._scf_class, self._scf_name)) inputs["metadata"]["call_link_label"] = "calc_{}".format(self._scf_name) try: with disable_caching(): # even if the calculation has already been run, the remote folder may not be available future = self.submit(self._scf_class, **inputs) except Exception as err: self.report("SCF submission failed: {}".format(err)) return self.exit_codes.ERROR_SCF_SUBMISSION_FAILED self.report("launched SCF calculation: {}".format(future)) return ToContext(calc_scf=future)
[docs] def check_scf_calculation(self): """Check that the SCF calculation finished successfully, and add the remote folder to the context.""" if not self.ctx.calc_scf.is_finished_ok: self.report( "{} failed with exit code: {}".format( self.ctx.calc_scf, self.ctx.calc_scf.exit_status ) ) return self.exit_codes.ERROR_SCF_CALC_FAILED self.report("{} finished successfully".format(self.ctx.calc_scf)) self.ctx.wf_folder = self.ctx.calc_scf.outputs.remote_folder self.out_many( self.exposed_outputs( self.ctx.calc_scf, self._scf_class, namespace=self._scf_name ) )
[docs] def submit_prop_calculations(self): """Create and submit all property calculations.""" if "test_run" in self.inputs and self.inputs.test_run.value: self.report( "`test_run` specified, stopping before submitting property calculations" ) return self.exit_codes.END_OF_TEST_RUN for pname, process_class in self._cry_props.items(): if pname not in self.inputs: continue inputs = AttributeDict(self.exposed_inputs(process_class, pname)) inputs.wf_folder = self.ctx.wf_folder link_label = "calc_{}".format(pname) inputs["metadata"]["call_link_label"] = link_label inputs["metadata"]["options"]["input_wf_name"] = self._wf_fname future = self.submit(process_class, **inputs) self.report("launched {} calculation {}".format(pname, future)) self.to_context(**{link_label: future})
[docs] def check_prop_calculations(self): """Check that the property calculations finished successfully.""" all_successful = True for pname, process_class in self._cry_props.items(): link_label = "calc_{}".format(pname) if link_label not in self.ctx: continue calc_node = self.ctx[link_label] if not calc_node.is_finished_ok: self.report( "{}; {} failed with exit code: {}".format( link_label, calc_node, calc_node.exit_status ) ) all_successful = False continue self.report("{}; {} finished successfully".format(link_label, calc_node)) self.out_many( self.exposed_outputs(calc_node, process_class, namespace=pname) ) if not all_successful: return self.exit_codes.ERROR_PROP_CALC_FAILED
[docs] def on_terminated(self): """Clean the working directories of all child calculations if `clean_workdir=True` in the inputs.""" super(CryPropertiesWorkChain, self).on_terminated() if ( "clean_workdir" not in self.inputs or self.inputs.clean_workdir.value is False ): self.report("remote folders will not be cleaned") return cleaned_calcs = [] for called_descendant in self.node.called_descendants: if isinstance(called_descendant, orm.CalcJobNode): try: called_descendant.outputs.remote_folder._clean() # pylint: disable=protected-access cleaned_calcs.append(str(called_descendant.pk)) except (IOError, OSError, KeyError): pass if cleaned_calcs: self.report( "cleaned remote folders of calculations: {}".format( " ".join(cleaned_calcs) ) )